linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3   drbd_receiver.c
   4
   5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   6
   7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  10
  11 */
  12
  13
  14#include <linux/module.h>
  15
  16#include <linux/uaccess.h>
  17#include <net/sock.h>
  18
  19#include <linux/drbd.h>
  20#include <linux/fs.h>
  21#include <linux/file.h>
  22#include <linux/in.h>
  23#include <linux/mm.h>
  24#include <linux/memcontrol.h>
  25#include <linux/mm_inline.h>
  26#include <linux/slab.h>
  27#include <uapi/linux/sched/types.h>
  28#include <linux/sched/signal.h>
  29#include <linux/pkt_sched.h>
  30#define __KERNEL_SYSCALLS__
  31#include <linux/unistd.h>
  32#include <linux/vmalloc.h>
  33#include <linux/random.h>
  34#include <linux/string.h>
  35#include <linux/scatterlist.h>
  36#include "drbd_int.h"
  37#include "drbd_protocol.h"
  38#include "drbd_req.h"
  39#include "drbd_vli.h"
  40
  41#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
  42
  43struct packet_info {
  44        enum drbd_packet cmd;
  45        unsigned int size;
  46        unsigned int vnr;
  47        void *data;
  48};
  49
  50enum finish_epoch {
  51        FE_STILL_LIVE,
  52        FE_DESTROYED,
  53        FE_RECYCLED,
  54};
  55
  56static int drbd_do_features(struct drbd_connection *connection);
  57static int drbd_do_auth(struct drbd_connection *connection);
  58static int drbd_disconnected(struct drbd_peer_device *);
  59static void conn_wait_active_ee_empty(struct drbd_connection *connection);
  60static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  61static int e_end_block(struct drbd_work *, int);
  62
  63
  64#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  65
  66/*
  67 * some helper functions to deal with single linked page lists,
  68 * page->private being our "next" pointer.
  69 */
  70
  71/* If at least n pages are linked at head, get n pages off.
  72 * Otherwise, don't modify head, and return NULL.
  73 * Locking is the responsibility of the caller.
  74 */
  75static struct page *page_chain_del(struct page **head, int n)
  76{
  77        struct page *page;
  78        struct page *tmp;
  79
  80        BUG_ON(!n);
  81        BUG_ON(!head);
  82
  83        page = *head;
  84
  85        if (!page)
  86                return NULL;
  87
  88        while (page) {
  89                tmp = page_chain_next(page);
  90                if (--n == 0)
  91                        break; /* found sufficient pages */
  92                if (tmp == NULL)
  93                        /* insufficient pages, don't use any of them. */
  94                        return NULL;
  95                page = tmp;
  96        }
  97
  98        /* add end of list marker for the returned list */
  99        set_page_private(page, 0);
 100        /* actual return value, and adjustment of head */
 101        page = *head;
 102        *head = tmp;
 103        return page;
 104}
 105
 106/* may be used outside of locks to find the tail of a (usually short)
 107 * "private" page chain, before adding it back to a global chain head
 108 * with page_chain_add() under a spinlock. */
 109static struct page *page_chain_tail(struct page *page, int *len)
 110{
 111        struct page *tmp;
 112        int i = 1;
 113        while ((tmp = page_chain_next(page)))
 114                ++i, page = tmp;
 115        if (len)
 116                *len = i;
 117        return page;
 118}
 119
 120static int page_chain_free(struct page *page)
 121{
 122        struct page *tmp;
 123        int i = 0;
 124        page_chain_for_each_safe(page, tmp) {
 125                put_page(page);
 126                ++i;
 127        }
 128        return i;
 129}
 130
 131static void page_chain_add(struct page **head,
 132                struct page *chain_first, struct page *chain_last)
 133{
 134#if 1
 135        struct page *tmp;
 136        tmp = page_chain_tail(chain_first, NULL);
 137        BUG_ON(tmp != chain_last);
 138#endif
 139
 140        /* add chain to head */
 141        set_page_private(chain_last, (unsigned long)*head);
 142        *head = chain_first;
 143}
 144
 145static struct page *__drbd_alloc_pages(struct drbd_device *device,
 146                                       unsigned int number)
 147{
 148        struct page *page = NULL;
 149        struct page *tmp = NULL;
 150        unsigned int i = 0;
 151
 152        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 153         * So what. It saves a spin_lock. */
 154        if (drbd_pp_vacant >= number) {
 155                spin_lock(&drbd_pp_lock);
 156                page = page_chain_del(&drbd_pp_pool, number);
 157                if (page)
 158                        drbd_pp_vacant -= number;
 159                spin_unlock(&drbd_pp_lock);
 160                if (page)
 161                        return page;
 162        }
 163
 164        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 165         * "criss-cross" setup, that might cause write-out on some other DRBD,
 166         * which in turn might block on the other node at this very place.  */
 167        for (i = 0; i < number; i++) {
 168                tmp = alloc_page(GFP_TRY);
 169                if (!tmp)
 170                        break;
 171                set_page_private(tmp, (unsigned long)page);
 172                page = tmp;
 173        }
 174
 175        if (i == number)
 176                return page;
 177
 178        /* Not enough pages immediately available this time.
 179         * No need to jump around here, drbd_alloc_pages will retry this
 180         * function "soon". */
 181        if (page) {
 182                tmp = page_chain_tail(page, NULL);
 183                spin_lock(&drbd_pp_lock);
 184                page_chain_add(&drbd_pp_pool, page, tmp);
 185                drbd_pp_vacant += i;
 186                spin_unlock(&drbd_pp_lock);
 187        }
 188        return NULL;
 189}
 190
 191static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
 192                                           struct list_head *to_be_freed)
 193{
 194        struct drbd_peer_request *peer_req, *tmp;
 195
 196        /* The EEs are always appended to the end of the list. Since
 197           they are sent in order over the wire, they have to finish
 198           in order. As soon as we see the first not finished we can
 199           stop to examine the list... */
 200
 201        list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
 202                if (drbd_peer_req_has_active_page(peer_req))
 203                        break;
 204                list_move(&peer_req->w.list, to_be_freed);
 205        }
 206}
 207
 208static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
 209{
 210        LIST_HEAD(reclaimed);
 211        struct drbd_peer_request *peer_req, *t;
 212
 213        spin_lock_irq(&device->resource->req_lock);
 214        reclaim_finished_net_peer_reqs(device, &reclaimed);
 215        spin_unlock_irq(&device->resource->req_lock);
 216        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 217                drbd_free_net_peer_req(device, peer_req);
 218}
 219
 220static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
 221{
 222        struct drbd_peer_device *peer_device;
 223        int vnr;
 224
 225        rcu_read_lock();
 226        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
 227                struct drbd_device *device = peer_device->device;
 228                if (!atomic_read(&device->pp_in_use_by_net))
 229                        continue;
 230
 231                kref_get(&device->kref);
 232                rcu_read_unlock();
 233                drbd_reclaim_net_peer_reqs(device);
 234                kref_put(&device->kref, drbd_destroy_device);
 235                rcu_read_lock();
 236        }
 237        rcu_read_unlock();
 238}
 239
 240/**
 241 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 242 * @device:     DRBD device.
 243 * @number:     number of pages requested
 244 * @retry:      whether to retry, if not enough pages are available right now
 245 *
 246 * Tries to allocate number pages, first from our own page pool, then from
 247 * the kernel.
 248 * Possibly retry until DRBD frees sufficient pages somewhere else.
 249 *
 250 * If this allocation would exceed the max_buffers setting, we throttle
 251 * allocation (schedule_timeout) to give the system some room to breathe.
 252 *
 253 * We do not use max-buffers as hard limit, because it could lead to
 254 * congestion and further to a distributed deadlock during online-verify or
 255 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 256 * resync-rate settings are mis-configured.
 257 *
 258 * Returns a page chain linked via page->private.
 259 */
 260struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
 261                              bool retry)
 262{
 263        struct drbd_device *device = peer_device->device;
 264        struct page *page = NULL;
 265        struct net_conf *nc;
 266        DEFINE_WAIT(wait);
 267        unsigned int mxb;
 268
 269        rcu_read_lock();
 270        nc = rcu_dereference(peer_device->connection->net_conf);
 271        mxb = nc ? nc->max_buffers : 1000000;
 272        rcu_read_unlock();
 273
 274        if (atomic_read(&device->pp_in_use) < mxb)
 275                page = __drbd_alloc_pages(device, number);
 276
 277        /* Try to keep the fast path fast, but occasionally we need
 278         * to reclaim the pages we lended to the network stack. */
 279        if (page && atomic_read(&device->pp_in_use_by_net) > 512)
 280                drbd_reclaim_net_peer_reqs(device);
 281
 282        while (page == NULL) {
 283                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 284
 285                drbd_reclaim_net_peer_reqs(device);
 286
 287                if (atomic_read(&device->pp_in_use) < mxb) {
 288                        page = __drbd_alloc_pages(device, number);
 289                        if (page)
 290                                break;
 291                }
 292
 293                if (!retry)
 294                        break;
 295
 296                if (signal_pending(current)) {
 297                        drbd_warn(device, "drbd_alloc_pages interrupted!\n");
 298                        break;
 299                }
 300
 301                if (schedule_timeout(HZ/10) == 0)
 302                        mxb = UINT_MAX;
 303        }
 304        finish_wait(&drbd_pp_wait, &wait);
 305
 306        if (page)
 307                atomic_add(number, &device->pp_in_use);
 308        return page;
 309}
 310
 311/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 312 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
 313 * Either links the page chain back to the global pool,
 314 * or returns all pages to the system. */
 315static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
 316{
 317        atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
 318        int i;
 319
 320        if (page == NULL)
 321                return;
 322
 323        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
 324                i = page_chain_free(page);
 325        else {
 326                struct page *tmp;
 327                tmp = page_chain_tail(page, &i);
 328                spin_lock(&drbd_pp_lock);
 329                page_chain_add(&drbd_pp_pool, page, tmp);
 330                drbd_pp_vacant += i;
 331                spin_unlock(&drbd_pp_lock);
 332        }
 333        i = atomic_sub_return(i, a);
 334        if (i < 0)
 335                drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
 336                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 337        wake_up(&drbd_pp_wait);
 338}
 339
 340/*
 341You need to hold the req_lock:
 342 _drbd_wait_ee_list_empty()
 343
 344You must not have the req_lock:
 345 drbd_free_peer_req()
 346 drbd_alloc_peer_req()
 347 drbd_free_peer_reqs()
 348 drbd_ee_fix_bhs()
 349 drbd_finish_peer_reqs()
 350 drbd_clear_done_ee()
 351 drbd_wait_ee_list_empty()
 352*/
 353
 354/* normal: payload_size == request size (bi_size)
 355 * w_same: payload_size == logical_block_size
 356 * trim: payload_size == 0 */
 357struct drbd_peer_request *
 358drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
 359                    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
 360{
 361        struct drbd_device *device = peer_device->device;
 362        struct drbd_peer_request *peer_req;
 363        struct page *page = NULL;
 364        unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 365
 366        if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
 367                return NULL;
 368
 369        peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 370        if (!peer_req) {
 371                if (!(gfp_mask & __GFP_NOWARN))
 372                        drbd_err(device, "%s: allocation failed\n", __func__);
 373                return NULL;
 374        }
 375
 376        if (nr_pages) {
 377                page = drbd_alloc_pages(peer_device, nr_pages,
 378                                        gfpflags_allow_blocking(gfp_mask));
 379                if (!page)
 380                        goto fail;
 381        }
 382
 383        memset(peer_req, 0, sizeof(*peer_req));
 384        INIT_LIST_HEAD(&peer_req->w.list);
 385        drbd_clear_interval(&peer_req->i);
 386        peer_req->i.size = request_size;
 387        peer_req->i.sector = sector;
 388        peer_req->submit_jif = jiffies;
 389        peer_req->peer_device = peer_device;
 390        peer_req->pages = page;
 391        /*
 392         * The block_id is opaque to the receiver.  It is not endianness
 393         * converted, and sent back to the sender unchanged.
 394         */
 395        peer_req->block_id = id;
 396
 397        return peer_req;
 398
 399 fail:
 400        mempool_free(peer_req, &drbd_ee_mempool);
 401        return NULL;
 402}
 403
 404void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
 405                       int is_net)
 406{
 407        might_sleep();
 408        if (peer_req->flags & EE_HAS_DIGEST)
 409                kfree(peer_req->digest);
 410        drbd_free_pages(device, peer_req->pages, is_net);
 411        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
 412        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 413        if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
 414                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 415                drbd_al_complete_io(device, &peer_req->i);
 416        }
 417        mempool_free(peer_req, &drbd_ee_mempool);
 418}
 419
 420int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
 421{
 422        LIST_HEAD(work_list);
 423        struct drbd_peer_request *peer_req, *t;
 424        int count = 0;
 425        int is_net = list == &device->net_ee;
 426
 427        spin_lock_irq(&device->resource->req_lock);
 428        list_splice_init(list, &work_list);
 429        spin_unlock_irq(&device->resource->req_lock);
 430
 431        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 432                __drbd_free_peer_req(device, peer_req, is_net);
 433                count++;
 434        }
 435        return count;
 436}
 437
 438/*
 439 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 440 */
 441static int drbd_finish_peer_reqs(struct drbd_device *device)
 442{
 443        LIST_HEAD(work_list);
 444        LIST_HEAD(reclaimed);
 445        struct drbd_peer_request *peer_req, *t;
 446        int err = 0;
 447
 448        spin_lock_irq(&device->resource->req_lock);
 449        reclaim_finished_net_peer_reqs(device, &reclaimed);
 450        list_splice_init(&device->done_ee, &work_list);
 451        spin_unlock_irq(&device->resource->req_lock);
 452
 453        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 454                drbd_free_net_peer_req(device, peer_req);
 455
 456        /* possible callbacks here:
 457         * e_end_block, and e_end_resync_block, e_send_superseded.
 458         * all ignore the last argument.
 459         */
 460        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 461                int err2;
 462
 463                /* list_del not necessary, next/prev members not touched */
 464                err2 = peer_req->w.cb(&peer_req->w, !!err);
 465                if (!err)
 466                        err = err2;
 467                drbd_free_peer_req(device, peer_req);
 468        }
 469        wake_up(&device->ee_wait);
 470
 471        return err;
 472}
 473
 474static void _drbd_wait_ee_list_empty(struct drbd_device *device,
 475                                     struct list_head *head)
 476{
 477        DEFINE_WAIT(wait);
 478
 479        /* avoids spin_lock/unlock
 480         * and calling prepare_to_wait in the fast path */
 481        while (!list_empty(head)) {
 482                prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 483                spin_unlock_irq(&device->resource->req_lock);
 484                io_schedule();
 485                finish_wait(&device->ee_wait, &wait);
 486                spin_lock_irq(&device->resource->req_lock);
 487        }
 488}
 489
 490static void drbd_wait_ee_list_empty(struct drbd_device *device,
 491                                    struct list_head *head)
 492{
 493        spin_lock_irq(&device->resource->req_lock);
 494        _drbd_wait_ee_list_empty(device, head);
 495        spin_unlock_irq(&device->resource->req_lock);
 496}
 497
 498static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 499{
 500        struct kvec iov = {
 501                .iov_base = buf,
 502                .iov_len = size,
 503        };
 504        struct msghdr msg = {
 505                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 506        };
 507        iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
 508        return sock_recvmsg(sock, &msg, msg.msg_flags);
 509}
 510
 511static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
 512{
 513        int rv;
 514
 515        rv = drbd_recv_short(connection->data.socket, buf, size, 0);
 516
 517        if (rv < 0) {
 518                if (rv == -ECONNRESET)
 519                        drbd_info(connection, "sock was reset by peer\n");
 520                else if (rv != -ERESTARTSYS)
 521                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
 522        } else if (rv == 0) {
 523                if (test_bit(DISCONNECT_SENT, &connection->flags)) {
 524                        long t;
 525                        rcu_read_lock();
 526                        t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
 527                        rcu_read_unlock();
 528
 529                        t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
 530
 531                        if (t)
 532                                goto out;
 533                }
 534                drbd_info(connection, "sock was shut down by peer\n");
 535        }
 536
 537        if (rv != size)
 538                conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
 539
 540out:
 541        return rv;
 542}
 543
 544static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
 545{
 546        int err;
 547
 548        err = drbd_recv(connection, buf, size);
 549        if (err != size) {
 550                if (err >= 0)
 551                        err = -EIO;
 552        } else
 553                err = 0;
 554        return err;
 555}
 556
 557static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
 558{
 559        int err;
 560
 561        err = drbd_recv_all(connection, buf, size);
 562        if (err && !signal_pending(current))
 563                drbd_warn(connection, "short read (expected size %d)\n", (int)size);
 564        return err;
 565}
 566
 567/* quoting tcp(7):
 568 *   On individual connections, the socket buffer size must be set prior to the
 569 *   listen(2) or connect(2) calls in order to have it take effect.
 570 * This is our wrapper to do so.
 571 */
 572static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 573                unsigned int rcv)
 574{
 575        /* open coded SO_SNDBUF, SO_RCVBUF */
 576        if (snd) {
 577                sock->sk->sk_sndbuf = snd;
 578                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 579        }
 580        if (rcv) {
 581                sock->sk->sk_rcvbuf = rcv;
 582                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 583        }
 584}
 585
 586static struct socket *drbd_try_connect(struct drbd_connection *connection)
 587{
 588        const char *what;
 589        struct socket *sock;
 590        struct sockaddr_in6 src_in6;
 591        struct sockaddr_in6 peer_in6;
 592        struct net_conf *nc;
 593        int err, peer_addr_len, my_addr_len;
 594        int sndbuf_size, rcvbuf_size, connect_int;
 595        int disconnect_on_error = 1;
 596
 597        rcu_read_lock();
 598        nc = rcu_dereference(connection->net_conf);
 599        if (!nc) {
 600                rcu_read_unlock();
 601                return NULL;
 602        }
 603        sndbuf_size = nc->sndbuf_size;
 604        rcvbuf_size = nc->rcvbuf_size;
 605        connect_int = nc->connect_int;
 606        rcu_read_unlock();
 607
 608        my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
 609        memcpy(&src_in6, &connection->my_addr, my_addr_len);
 610
 611        if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
 612                src_in6.sin6_port = 0;
 613        else
 614                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 615
 616        peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
 617        memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
 618
 619        what = "sock_create_kern";
 620        err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
 621                               SOCK_STREAM, IPPROTO_TCP, &sock);
 622        if (err < 0) {
 623                sock = NULL;
 624                goto out;
 625        }
 626
 627        sock->sk->sk_rcvtimeo =
 628        sock->sk->sk_sndtimeo = connect_int * HZ;
 629        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 630
 631       /* explicitly bind to the configured IP as source IP
 632        *  for the outgoing connections.
 633        *  This is needed for multihomed hosts and to be
 634        *  able to use lo: interfaces for drbd.
 635        * Make sure to use 0 as port number, so linux selects
 636        *  a free one dynamically.
 637        */
 638        what = "bind before connect";
 639        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 640        if (err < 0)
 641                goto out;
 642
 643        /* connect may fail, peer not yet available.
 644         * stay C_WF_CONNECTION, don't go Disconnecting! */
 645        disconnect_on_error = 0;
 646        what = "connect";
 647        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 648
 649out:
 650        if (err < 0) {
 651                if (sock) {
 652                        sock_release(sock);
 653                        sock = NULL;
 654                }
 655                switch (-err) {
 656                        /* timeout, busy, signal pending */
 657                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 658                case EINTR: case ERESTARTSYS:
 659                        /* peer not (yet) available, network problem */
 660                case ECONNREFUSED: case ENETUNREACH:
 661                case EHOSTDOWN:    case EHOSTUNREACH:
 662                        disconnect_on_error = 0;
 663                        break;
 664                default:
 665                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 666                }
 667                if (disconnect_on_error)
 668                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 669        }
 670
 671        return sock;
 672}
 673
 674struct accept_wait_data {
 675        struct drbd_connection *connection;
 676        struct socket *s_listen;
 677        struct completion door_bell;
 678        void (*original_sk_state_change)(struct sock *sk);
 679
 680};
 681
 682static void drbd_incoming_connection(struct sock *sk)
 683{
 684        struct accept_wait_data *ad = sk->sk_user_data;
 685        void (*state_change)(struct sock *sk);
 686
 687        state_change = ad->original_sk_state_change;
 688        if (sk->sk_state == TCP_ESTABLISHED)
 689                complete(&ad->door_bell);
 690        state_change(sk);
 691}
 692
 693static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
 694{
 695        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 696        struct sockaddr_in6 my_addr;
 697        struct socket *s_listen;
 698        struct net_conf *nc;
 699        const char *what;
 700
 701        rcu_read_lock();
 702        nc = rcu_dereference(connection->net_conf);
 703        if (!nc) {
 704                rcu_read_unlock();
 705                return -EIO;
 706        }
 707        sndbuf_size = nc->sndbuf_size;
 708        rcvbuf_size = nc->rcvbuf_size;
 709        rcu_read_unlock();
 710
 711        my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
 712        memcpy(&my_addr, &connection->my_addr, my_addr_len);
 713
 714        what = "sock_create_kern";
 715        err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
 716                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 717        if (err) {
 718                s_listen = NULL;
 719                goto out;
 720        }
 721
 722        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 723        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 724
 725        what = "bind before listen";
 726        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 727        if (err < 0)
 728                goto out;
 729
 730        ad->s_listen = s_listen;
 731        write_lock_bh(&s_listen->sk->sk_callback_lock);
 732        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 733        s_listen->sk->sk_state_change = drbd_incoming_connection;
 734        s_listen->sk->sk_user_data = ad;
 735        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 736
 737        what = "listen";
 738        err = s_listen->ops->listen(s_listen, 5);
 739        if (err < 0)
 740                goto out;
 741
 742        return 0;
 743out:
 744        if (s_listen)
 745                sock_release(s_listen);
 746        if (err < 0) {
 747                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 748                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 749                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 750                }
 751        }
 752
 753        return -EIO;
 754}
 755
 756static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 757{
 758        write_lock_bh(&sk->sk_callback_lock);
 759        sk->sk_state_change = ad->original_sk_state_change;
 760        sk->sk_user_data = NULL;
 761        write_unlock_bh(&sk->sk_callback_lock);
 762}
 763
 764static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
 765{
 766        int timeo, connect_int, err = 0;
 767        struct socket *s_estab = NULL;
 768        struct net_conf *nc;
 769
 770        rcu_read_lock();
 771        nc = rcu_dereference(connection->net_conf);
 772        if (!nc) {
 773                rcu_read_unlock();
 774                return NULL;
 775        }
 776        connect_int = nc->connect_int;
 777        rcu_read_unlock();
 778
 779        timeo = connect_int * HZ;
 780        /* 28.5% random jitter */
 781        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 782
 783        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 784        if (err <= 0)
 785                return NULL;
 786
 787        err = kernel_accept(ad->s_listen, &s_estab, 0);
 788        if (err < 0) {
 789                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 790                        drbd_err(connection, "accept failed, err = %d\n", err);
 791                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 792                }
 793        }
 794
 795        if (s_estab)
 796                unregister_state_change(s_estab->sk, ad);
 797
 798        return s_estab;
 799}
 800
 801static int decode_header(struct drbd_connection *, void *, struct packet_info *);
 802
 803static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
 804                             enum drbd_packet cmd)
 805{
 806        if (!conn_prepare_command(connection, sock))
 807                return -EIO;
 808        return conn_send_command(connection, sock, cmd, 0, NULL, 0);
 809}
 810
 811static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
 812{
 813        unsigned int header_size = drbd_header_size(connection);
 814        struct packet_info pi;
 815        struct net_conf *nc;
 816        int err;
 817
 818        rcu_read_lock();
 819        nc = rcu_dereference(connection->net_conf);
 820        if (!nc) {
 821                rcu_read_unlock();
 822                return -EIO;
 823        }
 824        sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
 825        rcu_read_unlock();
 826
 827        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
 828        if (err != header_size) {
 829                if (err >= 0)
 830                        err = -EIO;
 831                return err;
 832        }
 833        err = decode_header(connection, connection->data.rbuf, &pi);
 834        if (err)
 835                return err;
 836        return pi.cmd;
 837}
 838
 839/**
 840 * drbd_socket_okay() - Free the socket if its connection is not okay
 841 * @sock:       pointer to the pointer to the socket.
 842 */
 843static bool drbd_socket_okay(struct socket **sock)
 844{
 845        int rr;
 846        char tb[4];
 847
 848        if (!*sock)
 849                return false;
 850
 851        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 852
 853        if (rr > 0 || rr == -EAGAIN) {
 854                return true;
 855        } else {
 856                sock_release(*sock);
 857                *sock = NULL;
 858                return false;
 859        }
 860}
 861
 862static bool connection_established(struct drbd_connection *connection,
 863                                   struct socket **sock1,
 864                                   struct socket **sock2)
 865{
 866        struct net_conf *nc;
 867        int timeout;
 868        bool ok;
 869
 870        if (!*sock1 || !*sock2)
 871                return false;
 872
 873        rcu_read_lock();
 874        nc = rcu_dereference(connection->net_conf);
 875        timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
 876        rcu_read_unlock();
 877        schedule_timeout_interruptible(timeout);
 878
 879        ok = drbd_socket_okay(sock1);
 880        ok = drbd_socket_okay(sock2) && ok;
 881
 882        return ok;
 883}
 884
 885/* Gets called if a connection is established, or if a new minor gets created
 886   in a connection */
 887int drbd_connected(struct drbd_peer_device *peer_device)
 888{
 889        struct drbd_device *device = peer_device->device;
 890        int err;
 891
 892        atomic_set(&device->packet_seq, 0);
 893        device->peer_seq = 0;
 894
 895        device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
 896                &peer_device->connection->cstate_mutex :
 897                &device->own_state_mutex;
 898
 899        err = drbd_send_sync_param(peer_device);
 900        if (!err)
 901                err = drbd_send_sizes(peer_device, 0, 0);
 902        if (!err)
 903                err = drbd_send_uuids(peer_device);
 904        if (!err)
 905                err = drbd_send_current_state(peer_device);
 906        clear_bit(USE_DEGR_WFC_T, &device->flags);
 907        clear_bit(RESIZE_PENDING, &device->flags);
 908        atomic_set(&device->ap_in_flight, 0);
 909        mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
 910        return err;
 911}
 912
 913/*
 914 * return values:
 915 *   1 yes, we have a valid connection
 916 *   0 oops, did not work out, please try again
 917 *  -1 peer talks different language,
 918 *     no point in trying again, please go standalone.
 919 *  -2 We do not have a network config...
 920 */
 921static int conn_connect(struct drbd_connection *connection)
 922{
 923        struct drbd_socket sock, msock;
 924        struct drbd_peer_device *peer_device;
 925        struct net_conf *nc;
 926        int vnr, timeout, h;
 927        bool discard_my_data, ok;
 928        enum drbd_state_rv rv;
 929        struct accept_wait_data ad = {
 930                .connection = connection,
 931                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 932        };
 933
 934        clear_bit(DISCONNECT_SENT, &connection->flags);
 935        if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 936                return -2;
 937
 938        mutex_init(&sock.mutex);
 939        sock.sbuf = connection->data.sbuf;
 940        sock.rbuf = connection->data.rbuf;
 941        sock.socket = NULL;
 942        mutex_init(&msock.mutex);
 943        msock.sbuf = connection->meta.sbuf;
 944        msock.rbuf = connection->meta.rbuf;
 945        msock.socket = NULL;
 946
 947        /* Assume that the peer only understands protocol 80 until we know better.  */
 948        connection->agreed_pro_version = 80;
 949
 950        if (prepare_listen_socket(connection, &ad))
 951                return 0;
 952
 953        do {
 954                struct socket *s;
 955
 956                s = drbd_try_connect(connection);
 957                if (s) {
 958                        if (!sock.socket) {
 959                                sock.socket = s;
 960                                send_first_packet(connection, &sock, P_INITIAL_DATA);
 961                        } else if (!msock.socket) {
 962                                clear_bit(RESOLVE_CONFLICTS, &connection->flags);
 963                                msock.socket = s;
 964                                send_first_packet(connection, &msock, P_INITIAL_META);
 965                        } else {
 966                                drbd_err(connection, "Logic error in conn_connect()\n");
 967                                goto out_release_sockets;
 968                        }
 969                }
 970
 971                if (connection_established(connection, &sock.socket, &msock.socket))
 972                        break;
 973
 974retry:
 975                s = drbd_wait_for_connect(connection, &ad);
 976                if (s) {
 977                        int fp = receive_first_packet(connection, s);
 978                        drbd_socket_okay(&sock.socket);
 979                        drbd_socket_okay(&msock.socket);
 980                        switch (fp) {
 981                        case P_INITIAL_DATA:
 982                                if (sock.socket) {
 983                                        drbd_warn(connection, "initial packet S crossed\n");
 984                                        sock_release(sock.socket);
 985                                        sock.socket = s;
 986                                        goto randomize;
 987                                }
 988                                sock.socket = s;
 989                                break;
 990                        case P_INITIAL_META:
 991                                set_bit(RESOLVE_CONFLICTS, &connection->flags);
 992                                if (msock.socket) {
 993                                        drbd_warn(connection, "initial packet M crossed\n");
 994                                        sock_release(msock.socket);
 995                                        msock.socket = s;
 996                                        goto randomize;
 997                                }
 998                                msock.socket = s;
 999                                break;
1000                        default:
1001                                drbd_warn(connection, "Error receiving initial packet\n");
1002                                sock_release(s);
1003randomize:
1004                                if (prandom_u32() & 1)
1005                                        goto retry;
1006                        }
1007                }
1008
1009                if (connection->cstate <= C_DISCONNECTING)
1010                        goto out_release_sockets;
1011                if (signal_pending(current)) {
1012                        flush_signals(current);
1013                        smp_rmb();
1014                        if (get_t_state(&connection->receiver) == EXITING)
1015                                goto out_release_sockets;
1016                }
1017
1018                ok = connection_established(connection, &sock.socket, &msock.socket);
1019        } while (!ok);
1020
1021        if (ad.s_listen)
1022                sock_release(ad.s_listen);
1023
1024        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1025        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1026
1027        sock.socket->sk->sk_allocation = GFP_NOIO;
1028        msock.socket->sk->sk_allocation = GFP_NOIO;
1029
1030        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1031        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1032
1033        /* NOT YET ...
1034         * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1035         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1036         * first set it to the P_CONNECTION_FEATURES timeout,
1037         * which we set to 4x the configured ping_timeout. */
1038        rcu_read_lock();
1039        nc = rcu_dereference(connection->net_conf);
1040
1041        sock.socket->sk->sk_sndtimeo =
1042        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1043
1044        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1045        timeout = nc->timeout * HZ / 10;
1046        discard_my_data = nc->discard_my_data;
1047        rcu_read_unlock();
1048
1049        msock.socket->sk->sk_sndtimeo = timeout;
1050
1051        /* we don't want delays.
1052         * we use TCP_CORK where appropriate, though */
1053        drbd_tcp_nodelay(sock.socket);
1054        drbd_tcp_nodelay(msock.socket);
1055
1056        connection->data.socket = sock.socket;
1057        connection->meta.socket = msock.socket;
1058        connection->last_received = jiffies;
1059
1060        h = drbd_do_features(connection);
1061        if (h <= 0)
1062                return h;
1063
1064        if (connection->cram_hmac_tfm) {
1065                /* drbd_request_state(device, NS(conn, WFAuth)); */
1066                switch (drbd_do_auth(connection)) {
1067                case -1:
1068                        drbd_err(connection, "Authentication of peer failed\n");
1069                        return -1;
1070                case 0:
1071                        drbd_err(connection, "Authentication of peer failed, trying again.\n");
1072                        return 0;
1073                }
1074        }
1075
1076        connection->data.socket->sk->sk_sndtimeo = timeout;
1077        connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1078
1079        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1080                return -1;
1081
1082        /* Prevent a race between resync-handshake and
1083         * being promoted to Primary.
1084         *
1085         * Grab and release the state mutex, so we know that any current
1086         * drbd_set_role() is finished, and any incoming drbd_set_role
1087         * will see the STATE_SENT flag, and wait for it to be cleared.
1088         */
1089        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1090                mutex_lock(peer_device->device->state_mutex);
1091
1092        /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1093        spin_lock_irq(&connection->resource->req_lock);
1094        set_bit(STATE_SENT, &connection->flags);
1095        spin_unlock_irq(&connection->resource->req_lock);
1096
1097        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1098                mutex_unlock(peer_device->device->state_mutex);
1099
1100        rcu_read_lock();
1101        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1102                struct drbd_device *device = peer_device->device;
1103                kref_get(&device->kref);
1104                rcu_read_unlock();
1105
1106                if (discard_my_data)
1107                        set_bit(DISCARD_MY_DATA, &device->flags);
1108                else
1109                        clear_bit(DISCARD_MY_DATA, &device->flags);
1110
1111                drbd_connected(peer_device);
1112                kref_put(&device->kref, drbd_destroy_device);
1113                rcu_read_lock();
1114        }
1115        rcu_read_unlock();
1116
1117        rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1118        if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1119                clear_bit(STATE_SENT, &connection->flags);
1120                return 0;
1121        }
1122
1123        drbd_thread_start(&connection->ack_receiver);
1124        /* opencoded create_singlethread_workqueue(),
1125         * to be able to use format string arguments */
1126        connection->ack_sender =
1127                alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1128        if (!connection->ack_sender) {
1129                drbd_err(connection, "Failed to create workqueue ack_sender\n");
1130                return 0;
1131        }
1132
1133        mutex_lock(&connection->resource->conf_update);
1134        /* The discard_my_data flag is a single-shot modifier to the next
1135         * connection attempt, the handshake of which is now well underway.
1136         * No need for rcu style copying of the whole struct
1137         * just to clear a single value. */
1138        connection->net_conf->discard_my_data = 0;
1139        mutex_unlock(&connection->resource->conf_update);
1140
1141        return h;
1142
1143out_release_sockets:
1144        if (ad.s_listen)
1145                sock_release(ad.s_listen);
1146        if (sock.socket)
1147                sock_release(sock.socket);
1148        if (msock.socket)
1149                sock_release(msock.socket);
1150        return -1;
1151}
1152
1153static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1154{
1155        unsigned int header_size = drbd_header_size(connection);
1156
1157        if (header_size == sizeof(struct p_header100) &&
1158            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1159                struct p_header100 *h = header;
1160                if (h->pad != 0) {
1161                        drbd_err(connection, "Header padding is not zero\n");
1162                        return -EINVAL;
1163                }
1164                pi->vnr = be16_to_cpu(h->volume);
1165                pi->cmd = be16_to_cpu(h->command);
1166                pi->size = be32_to_cpu(h->length);
1167        } else if (header_size == sizeof(struct p_header95) &&
1168                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1169                struct p_header95 *h = header;
1170                pi->cmd = be16_to_cpu(h->command);
1171                pi->size = be32_to_cpu(h->length);
1172                pi->vnr = 0;
1173        } else if (header_size == sizeof(struct p_header80) &&
1174                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1175                struct p_header80 *h = header;
1176                pi->cmd = be16_to_cpu(h->command);
1177                pi->size = be16_to_cpu(h->length);
1178                pi->vnr = 0;
1179        } else {
1180                drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1181                         be32_to_cpu(*(__be32 *)header),
1182                         connection->agreed_pro_version);
1183                return -EINVAL;
1184        }
1185        pi->data = header + header_size;
1186        return 0;
1187}
1188
1189static void drbd_unplug_all_devices(struct drbd_connection *connection)
1190{
1191        if (current->plug == &connection->receiver_plug) {
1192                blk_finish_plug(&connection->receiver_plug);
1193                blk_start_plug(&connection->receiver_plug);
1194        } /* else: maybe just schedule() ?? */
1195}
1196
1197static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198{
1199        void *buffer = connection->data.rbuf;
1200        int err;
1201
1202        err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203        if (err)
1204                return err;
1205
1206        err = decode_header(connection, buffer, pi);
1207        connection->last_received = jiffies;
1208
1209        return err;
1210}
1211
1212static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1213{
1214        void *buffer = connection->data.rbuf;
1215        unsigned int size = drbd_header_size(connection);
1216        int err;
1217
1218        err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1219        if (err != size) {
1220                /* If we have nothing in the receive buffer now, to reduce
1221                 * application latency, try to drain the backend queues as
1222                 * quickly as possible, and let remote TCP know what we have
1223                 * received so far. */
1224                if (err == -EAGAIN) {
1225                        drbd_tcp_quickack(connection->data.socket);
1226                        drbd_unplug_all_devices(connection);
1227                }
1228                if (err > 0) {
1229                        buffer += err;
1230                        size -= err;
1231                }
1232                err = drbd_recv_all_warn(connection, buffer, size);
1233                if (err)
1234                        return err;
1235        }
1236
1237        err = decode_header(connection, connection->data.rbuf, pi);
1238        connection->last_received = jiffies;
1239
1240        return err;
1241}
1242/* This is blkdev_issue_flush, but asynchronous.
1243 * We want to submit to all component volumes in parallel,
1244 * then wait for all completions.
1245 */
1246struct issue_flush_context {
1247        atomic_t pending;
1248        int error;
1249        struct completion done;
1250};
1251struct one_flush_context {
1252        struct drbd_device *device;
1253        struct issue_flush_context *ctx;
1254};
1255
1256static void one_flush_endio(struct bio *bio)
1257{
1258        struct one_flush_context *octx = bio->bi_private;
1259        struct drbd_device *device = octx->device;
1260        struct issue_flush_context *ctx = octx->ctx;
1261
1262        if (bio->bi_status) {
1263                ctx->error = blk_status_to_errno(bio->bi_status);
1264                drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1265        }
1266        kfree(octx);
1267        bio_put(bio);
1268
1269        clear_bit(FLUSH_PENDING, &device->flags);
1270        put_ldev(device);
1271        kref_put(&device->kref, drbd_destroy_device);
1272
1273        if (atomic_dec_and_test(&ctx->pending))
1274                complete(&ctx->done);
1275}
1276
1277static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1278{
1279        struct bio *bio = bio_alloc(GFP_NOIO, 0);
1280        struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1281        if (!bio || !octx) {
1282                drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1283                /* FIXME: what else can I do now?  disconnecting or detaching
1284                 * really does not help to improve the state of the world, either.
1285                 */
1286                kfree(octx);
1287                if (bio)
1288                        bio_put(bio);
1289
1290                ctx->error = -ENOMEM;
1291                put_ldev(device);
1292                kref_put(&device->kref, drbd_destroy_device);
1293                return;
1294        }
1295
1296        octx->device = device;
1297        octx->ctx = ctx;
1298        bio_set_dev(bio, device->ldev->backing_bdev);
1299        bio->bi_private = octx;
1300        bio->bi_end_io = one_flush_endio;
1301        bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1302
1303        device->flush_jif = jiffies;
1304        set_bit(FLUSH_PENDING, &device->flags);
1305        atomic_inc(&ctx->pending);
1306        submit_bio(bio);
1307}
1308
1309static void drbd_flush(struct drbd_connection *connection)
1310{
1311        if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1312                struct drbd_peer_device *peer_device;
1313                struct issue_flush_context ctx;
1314                int vnr;
1315
1316                atomic_set(&ctx.pending, 1);
1317                ctx.error = 0;
1318                init_completion(&ctx.done);
1319
1320                rcu_read_lock();
1321                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1322                        struct drbd_device *device = peer_device->device;
1323
1324                        if (!get_ldev(device))
1325                                continue;
1326                        kref_get(&device->kref);
1327                        rcu_read_unlock();
1328
1329                        submit_one_flush(device, &ctx);
1330
1331                        rcu_read_lock();
1332                }
1333                rcu_read_unlock();
1334
1335                /* Do we want to add a timeout,
1336                 * if disk-timeout is set? */
1337                if (!atomic_dec_and_test(&ctx.pending))
1338                        wait_for_completion(&ctx.done);
1339
1340                if (ctx.error) {
1341                        /* would rather check on EOPNOTSUPP, but that is not reliable.
1342                         * don't try again for ANY return value != 0
1343                         * if (rv == -EOPNOTSUPP) */
1344                        /* Any error is already reported by bio_endio callback. */
1345                        drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1346                }
1347        }
1348}
1349
1350/**
1351 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1352 * @device:     DRBD device.
1353 * @epoch:      Epoch object.
1354 * @ev:         Epoch event.
1355 */
1356static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1357                                               struct drbd_epoch *epoch,
1358                                               enum epoch_event ev)
1359{
1360        int epoch_size;
1361        struct drbd_epoch *next_epoch;
1362        enum finish_epoch rv = FE_STILL_LIVE;
1363
1364        spin_lock(&connection->epoch_lock);
1365        do {
1366                next_epoch = NULL;
1367
1368                epoch_size = atomic_read(&epoch->epoch_size);
1369
1370                switch (ev & ~EV_CLEANUP) {
1371                case EV_PUT:
1372                        atomic_dec(&epoch->active);
1373                        break;
1374                case EV_GOT_BARRIER_NR:
1375                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1376                        break;
1377                case EV_BECAME_LAST:
1378                        /* nothing to do*/
1379                        break;
1380                }
1381
1382                if (epoch_size != 0 &&
1383                    atomic_read(&epoch->active) == 0 &&
1384                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1385                        if (!(ev & EV_CLEANUP)) {
1386                                spin_unlock(&connection->epoch_lock);
1387                                drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1388                                spin_lock(&connection->epoch_lock);
1389                        }
1390#if 0
1391                        /* FIXME: dec unacked on connection, once we have
1392                         * something to count pending connection packets in. */
1393                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1394                                dec_unacked(epoch->connection);
1395#endif
1396
1397                        if (connection->current_epoch != epoch) {
1398                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1399                                list_del(&epoch->list);
1400                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1401                                connection->epochs--;
1402                                kfree(epoch);
1403
1404                                if (rv == FE_STILL_LIVE)
1405                                        rv = FE_DESTROYED;
1406                        } else {
1407                                epoch->flags = 0;
1408                                atomic_set(&epoch->epoch_size, 0);
1409                                /* atomic_set(&epoch->active, 0); is already zero */
1410                                if (rv == FE_STILL_LIVE)
1411                                        rv = FE_RECYCLED;
1412                        }
1413                }
1414
1415                if (!next_epoch)
1416                        break;
1417
1418                epoch = next_epoch;
1419        } while (1);
1420
1421        spin_unlock(&connection->epoch_lock);
1422
1423        return rv;
1424}
1425
1426static enum write_ordering_e
1427max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1428{
1429        struct disk_conf *dc;
1430
1431        dc = rcu_dereference(bdev->disk_conf);
1432
1433        if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1434                wo = WO_DRAIN_IO;
1435        if (wo == WO_DRAIN_IO && !dc->disk_drain)
1436                wo = WO_NONE;
1437
1438        return wo;
1439}
1440
1441/**
1442 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1443 * @connection: DRBD connection.
1444 * @wo:         Write ordering method to try.
1445 */
1446void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1447                              enum write_ordering_e wo)
1448{
1449        struct drbd_device *device;
1450        enum write_ordering_e pwo;
1451        int vnr;
1452        static char *write_ordering_str[] = {
1453                [WO_NONE] = "none",
1454                [WO_DRAIN_IO] = "drain",
1455                [WO_BDEV_FLUSH] = "flush",
1456        };
1457
1458        pwo = resource->write_ordering;
1459        if (wo != WO_BDEV_FLUSH)
1460                wo = min(pwo, wo);
1461        rcu_read_lock();
1462        idr_for_each_entry(&resource->devices, device, vnr) {
1463                if (get_ldev(device)) {
1464                        wo = max_allowed_wo(device->ldev, wo);
1465                        if (device->ldev == bdev)
1466                                bdev = NULL;
1467                        put_ldev(device);
1468                }
1469        }
1470
1471        if (bdev)
1472                wo = max_allowed_wo(bdev, wo);
1473
1474        rcu_read_unlock();
1475
1476        resource->write_ordering = wo;
1477        if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1478                drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1479}
1480
1481/*
1482 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1483 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1484 * will directly go to fallback mode, submitting normal writes, and
1485 * never even try to UNMAP.
1486 *
1487 * And dm-thin does not do this (yet), mostly because in general it has
1488 * to assume that "skip_block_zeroing" is set.  See also:
1489 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1490 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1491 *
1492 * We *may* ignore the discard-zeroes-data setting, if so configured.
1493 *
1494 * Assumption is that this "discard_zeroes_data=0" is only because the backend
1495 * may ignore partial unaligned discards.
1496 *
1497 * LVM/DM thin as of at least
1498 *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1499 *   Library version: 1.02.93-RHEL7 (2015-01-28)
1500 *   Driver version:  4.29.0
1501 * still behaves this way.
1502 *
1503 * For unaligned (wrt. alignment and granularity) or too small discards,
1504 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1505 * but discard all the aligned full chunks.
1506 *
1507 * At least for LVM/DM thin, with skip_block_zeroing=false,
1508 * the result is effectively "discard_zeroes_data=1".
1509 */
1510/* flags: EE_TRIM|EE_ZEROOUT */
1511int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1512{
1513        struct block_device *bdev = device->ldev->backing_bdev;
1514        struct request_queue *q = bdev_get_queue(bdev);
1515        sector_t tmp, nr;
1516        unsigned int max_discard_sectors, granularity;
1517        int alignment;
1518        int err = 0;
1519
1520        if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1521                goto zero_out;
1522
1523        /* Zero-sector (unknown) and one-sector granularities are the same.  */
1524        granularity = max(q->limits.discard_granularity >> 9, 1U);
1525        alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1526
1527        max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1528        max_discard_sectors -= max_discard_sectors % granularity;
1529        if (unlikely(!max_discard_sectors))
1530                goto zero_out;
1531
1532        if (nr_sectors < granularity)
1533                goto zero_out;
1534
1535        tmp = start;
1536        if (sector_div(tmp, granularity) != alignment) {
1537                if (nr_sectors < 2*granularity)
1538                        goto zero_out;
1539                /* start + gran - (start + gran - align) % gran */
1540                tmp = start + granularity - alignment;
1541                tmp = start + granularity - sector_div(tmp, granularity);
1542
1543                nr = tmp - start;
1544                /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1545                 * layers are below us, some may have smaller granularity */
1546                err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1547                nr_sectors -= nr;
1548                start = tmp;
1549        }
1550        while (nr_sectors >= max_discard_sectors) {
1551                err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1552                nr_sectors -= max_discard_sectors;
1553                start += max_discard_sectors;
1554        }
1555        if (nr_sectors) {
1556                /* max_discard_sectors is unsigned int (and a multiple of
1557                 * granularity, we made sure of that above already);
1558                 * nr is < max_discard_sectors;
1559                 * I don't need sector_div here, even though nr is sector_t */
1560                nr = nr_sectors;
1561                nr -= (unsigned int)nr % granularity;
1562                if (nr) {
1563                        err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1564                        nr_sectors -= nr;
1565                        start += nr;
1566                }
1567        }
1568 zero_out:
1569        if (nr_sectors) {
1570                err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1571                                (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1572        }
1573        return err != 0;
1574}
1575
1576static bool can_do_reliable_discards(struct drbd_device *device)
1577{
1578        struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1579        struct disk_conf *dc;
1580        bool can_do;
1581
1582        if (!blk_queue_discard(q))
1583                return false;
1584
1585        rcu_read_lock();
1586        dc = rcu_dereference(device->ldev->disk_conf);
1587        can_do = dc->discard_zeroes_if_aligned;
1588        rcu_read_unlock();
1589        return can_do;
1590}
1591
1592static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1593{
1594        /* If the backend cannot discard, or does not guarantee
1595         * read-back zeroes in discarded ranges, we fall back to
1596         * zero-out.  Unless configuration specifically requested
1597         * otherwise. */
1598        if (!can_do_reliable_discards(device))
1599                peer_req->flags |= EE_ZEROOUT;
1600
1601        if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1602            peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1603                peer_req->flags |= EE_WAS_ERROR;
1604        drbd_endio_write_sec_final(peer_req);
1605}
1606
1607static void drbd_issue_peer_wsame(struct drbd_device *device,
1608                                  struct drbd_peer_request *peer_req)
1609{
1610        struct block_device *bdev = device->ldev->backing_bdev;
1611        sector_t s = peer_req->i.sector;
1612        sector_t nr = peer_req->i.size >> 9;
1613        if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1614                peer_req->flags |= EE_WAS_ERROR;
1615        drbd_endio_write_sec_final(peer_req);
1616}
1617
1618
1619/**
1620 * drbd_submit_peer_request()
1621 * @device:     DRBD device.
1622 * @peer_req:   peer request
1623 * @rw:         flag field, see bio->bi_opf
1624 *
1625 * May spread the pages to multiple bios,
1626 * depending on bio_add_page restrictions.
1627 *
1628 * Returns 0 if all bios have been submitted,
1629 * -ENOMEM if we could not allocate enough bios,
1630 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1631 *  single page to an empty bio (which should never happen and likely indicates
1632 *  that the lower level IO stack is in some way broken). This has been observed
1633 *  on certain Xen deployments.
1634 */
1635/* TODO allocate from our own bio_set. */
1636int drbd_submit_peer_request(struct drbd_device *device,
1637                             struct drbd_peer_request *peer_req,
1638                             const unsigned op, const unsigned op_flags,
1639                             const int fault_type)
1640{
1641        struct bio *bios = NULL;
1642        struct bio *bio;
1643        struct page *page = peer_req->pages;
1644        sector_t sector = peer_req->i.sector;
1645        unsigned data_size = peer_req->i.size;
1646        unsigned n_bios = 0;
1647        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1648        int err = -ENOMEM;
1649
1650        /* TRIM/DISCARD: for now, always use the helper function
1651         * blkdev_issue_zeroout(..., discard=true).
1652         * It's synchronous, but it does the right thing wrt. bio splitting.
1653         * Correctness first, performance later.  Next step is to code an
1654         * asynchronous variant of the same.
1655         */
1656        if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1657                /* wait for all pending IO completions, before we start
1658                 * zeroing things out. */
1659                conn_wait_active_ee_empty(peer_req->peer_device->connection);
1660                /* add it to the active list now,
1661                 * so we can find it to present it in debugfs */
1662                peer_req->submit_jif = jiffies;
1663                peer_req->flags |= EE_SUBMITTED;
1664
1665                /* If this was a resync request from receive_rs_deallocated(),
1666                 * it is already on the sync_ee list */
1667                if (list_empty(&peer_req->w.list)) {
1668                        spin_lock_irq(&device->resource->req_lock);
1669                        list_add_tail(&peer_req->w.list, &device->active_ee);
1670                        spin_unlock_irq(&device->resource->req_lock);
1671                }
1672
1673                if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1674                        drbd_issue_peer_discard_or_zero_out(device, peer_req);
1675                else /* EE_WRITE_SAME */
1676                        drbd_issue_peer_wsame(device, peer_req);
1677                return 0;
1678        }
1679
1680        /* In most cases, we will only need one bio.  But in case the lower
1681         * level restrictions happen to be different at this offset on this
1682         * side than those of the sending peer, we may need to submit the
1683         * request in more than one bio.
1684         *
1685         * Plain bio_alloc is good enough here, this is no DRBD internally
1686         * generated bio, but a bio allocated on behalf of the peer.
1687         */
1688next_bio:
1689        bio = bio_alloc(GFP_NOIO, nr_pages);
1690        if (!bio) {
1691                drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1692                goto fail;
1693        }
1694        /* > peer_req->i.sector, unless this is the first bio */
1695        bio->bi_iter.bi_sector = sector;
1696        bio_set_dev(bio, device->ldev->backing_bdev);
1697        bio_set_op_attrs(bio, op, op_flags);
1698        bio->bi_private = peer_req;
1699        bio->bi_end_io = drbd_peer_request_endio;
1700
1701        bio->bi_next = bios;
1702        bios = bio;
1703        ++n_bios;
1704
1705        page_chain_for_each(page) {
1706                unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1707                if (!bio_add_page(bio, page, len, 0))
1708                        goto next_bio;
1709                data_size -= len;
1710                sector += len >> 9;
1711                --nr_pages;
1712        }
1713        D_ASSERT(device, data_size == 0);
1714        D_ASSERT(device, page == NULL);
1715
1716        atomic_set(&peer_req->pending_bios, n_bios);
1717        /* for debugfs: update timestamp, mark as submitted */
1718        peer_req->submit_jif = jiffies;
1719        peer_req->flags |= EE_SUBMITTED;
1720        do {
1721                bio = bios;
1722                bios = bios->bi_next;
1723                bio->bi_next = NULL;
1724
1725                drbd_generic_make_request(device, fault_type, bio);
1726        } while (bios);
1727        return 0;
1728
1729fail:
1730        while (bios) {
1731                bio = bios;
1732                bios = bios->bi_next;
1733                bio_put(bio);
1734        }
1735        return err;
1736}
1737
1738static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1739                                             struct drbd_peer_request *peer_req)
1740{
1741        struct drbd_interval *i = &peer_req->i;
1742
1743        drbd_remove_interval(&device->write_requests, i);
1744        drbd_clear_interval(i);
1745
1746        /* Wake up any processes waiting for this peer request to complete.  */
1747        if (i->waiting)
1748                wake_up(&device->misc_wait);
1749}
1750
1751static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1752{
1753        struct drbd_peer_device *peer_device;
1754        int vnr;
1755
1756        rcu_read_lock();
1757        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1758                struct drbd_device *device = peer_device->device;
1759
1760                kref_get(&device->kref);
1761                rcu_read_unlock();
1762                drbd_wait_ee_list_empty(device, &device->active_ee);
1763                kref_put(&device->kref, drbd_destroy_device);
1764                rcu_read_lock();
1765        }
1766        rcu_read_unlock();
1767}
1768
1769static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1770{
1771        int rv;
1772        struct p_barrier *p = pi->data;
1773        struct drbd_epoch *epoch;
1774
1775        /* FIXME these are unacked on connection,
1776         * not a specific (peer)device.
1777         */
1778        connection->current_epoch->barrier_nr = p->barrier;
1779        connection->current_epoch->connection = connection;
1780        rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1781
1782        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1783         * the activity log, which means it would not be resynced in case the
1784         * R_PRIMARY crashes now.
1785         * Therefore we must send the barrier_ack after the barrier request was
1786         * completed. */
1787        switch (connection->resource->write_ordering) {
1788        case WO_NONE:
1789                if (rv == FE_RECYCLED)
1790                        return 0;
1791
1792                /* receiver context, in the writeout path of the other node.
1793                 * avoid potential distributed deadlock */
1794                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1795                if (epoch)
1796                        break;
1797                else
1798                        drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1799                        /* Fall through */
1800
1801        case WO_BDEV_FLUSH:
1802        case WO_DRAIN_IO:
1803                conn_wait_active_ee_empty(connection);
1804                drbd_flush(connection);
1805
1806                if (atomic_read(&connection->current_epoch->epoch_size)) {
1807                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1808                        if (epoch)
1809                                break;
1810                }
1811
1812                return 0;
1813        default:
1814                drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1815                         connection->resource->write_ordering);
1816                return -EIO;
1817        }
1818
1819        epoch->flags = 0;
1820        atomic_set(&epoch->epoch_size, 0);
1821        atomic_set(&epoch->active, 0);
1822
1823        spin_lock(&connection->epoch_lock);
1824        if (atomic_read(&connection->current_epoch->epoch_size)) {
1825                list_add(&epoch->list, &connection->current_epoch->list);
1826                connection->current_epoch = epoch;
1827                connection->epochs++;
1828        } else {
1829                /* The current_epoch got recycled while we allocated this one... */
1830                kfree(epoch);
1831        }
1832        spin_unlock(&connection->epoch_lock);
1833
1834        return 0;
1835}
1836
1837/* quick wrapper in case payload size != request_size (write same) */
1838static void drbd_csum_ee_size(struct crypto_shash *h,
1839                              struct drbd_peer_request *r, void *d,
1840                              unsigned int payload_size)
1841{
1842        unsigned int tmp = r->i.size;
1843        r->i.size = payload_size;
1844        drbd_csum_ee(h, r, d);
1845        r->i.size = tmp;
1846}
1847
1848/* used from receive_RSDataReply (recv_resync_read)
1849 * and from receive_Data.
1850 * data_size: actual payload ("data in")
1851 *      for normal writes that is bi_size.
1852 *      for discards, that is zero.
1853 *      for write same, it is logical_block_size.
1854 * both trim and write same have the bi_size ("data len to be affected")
1855 * as extra argument in the packet header.
1856 */
1857static struct drbd_peer_request *
1858read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1859              struct packet_info *pi) __must_hold(local)
1860{
1861        struct drbd_device *device = peer_device->device;
1862        const sector_t capacity = drbd_get_capacity(device->this_bdev);
1863        struct drbd_peer_request *peer_req;
1864        struct page *page;
1865        int digest_size, err;
1866        unsigned int data_size = pi->size, ds;
1867        void *dig_in = peer_device->connection->int_dig_in;
1868        void *dig_vv = peer_device->connection->int_dig_vv;
1869        unsigned long *data;
1870        struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1871        struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1872        struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1873
1874        digest_size = 0;
1875        if (!trim && peer_device->connection->peer_integrity_tfm) {
1876                digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1877                /*
1878                 * FIXME: Receive the incoming digest into the receive buffer
1879                 *        here, together with its struct p_data?
1880                 */
1881                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1882                if (err)
1883                        return NULL;
1884                data_size -= digest_size;
1885        }
1886
1887        /* assume request_size == data_size, but special case trim and wsame. */
1888        ds = data_size;
1889        if (trim) {
1890                if (!expect(data_size == 0))
1891                        return NULL;
1892                ds = be32_to_cpu(trim->size);
1893        } else if (zeroes) {
1894                if (!expect(data_size == 0))
1895                        return NULL;
1896                ds = be32_to_cpu(zeroes->size);
1897        } else if (wsame) {
1898                if (data_size != queue_logical_block_size(device->rq_queue)) {
1899                        drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1900                                data_size, queue_logical_block_size(device->rq_queue));
1901                        return NULL;
1902                }
1903                if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1904                        drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1905                                data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1906                        return NULL;
1907                }
1908                ds = be32_to_cpu(wsame->size);
1909        }
1910
1911        if (!expect(IS_ALIGNED(ds, 512)))
1912                return NULL;
1913        if (trim || wsame || zeroes) {
1914                if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1915                        return NULL;
1916        } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1917                return NULL;
1918
1919        /* even though we trust out peer,
1920         * we sometimes have to double check. */
1921        if (sector + (ds>>9) > capacity) {
1922                drbd_err(device, "request from peer beyond end of local disk: "
1923                        "capacity: %llus < sector: %llus + size: %u\n",
1924                        (unsigned long long)capacity,
1925                        (unsigned long long)sector, ds);
1926                return NULL;
1927        }
1928
1929        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1930         * "criss-cross" setup, that might cause write-out on some other DRBD,
1931         * which in turn might block on the other node at this very place.  */
1932        peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1933        if (!peer_req)
1934                return NULL;
1935
1936        peer_req->flags |= EE_WRITE;
1937        if (trim) {
1938                peer_req->flags |= EE_TRIM;
1939                return peer_req;
1940        }
1941        if (zeroes) {
1942                peer_req->flags |= EE_ZEROOUT;
1943                return peer_req;
1944        }
1945        if (wsame)
1946                peer_req->flags |= EE_WRITE_SAME;
1947
1948        /* receive payload size bytes into page chain */
1949        ds = data_size;
1950        page = peer_req->pages;
1951        page_chain_for_each(page) {
1952                unsigned len = min_t(int, ds, PAGE_SIZE);
1953                data = kmap(page);
1954                err = drbd_recv_all_warn(peer_device->connection, data, len);
1955                if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1956                        drbd_err(device, "Fault injection: Corrupting data on receive\n");
1957                        data[0] = data[0] ^ (unsigned long)-1;
1958                }
1959                kunmap(page);
1960                if (err) {
1961                        drbd_free_peer_req(device, peer_req);
1962                        return NULL;
1963                }
1964                ds -= len;
1965        }
1966
1967        if (digest_size) {
1968                drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1969                if (memcmp(dig_in, dig_vv, digest_size)) {
1970                        drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1971                                (unsigned long long)sector, data_size);
1972                        drbd_free_peer_req(device, peer_req);
1973                        return NULL;
1974                }
1975        }
1976        device->recv_cnt += data_size >> 9;
1977        return peer_req;
1978}
1979
1980/* drbd_drain_block() just takes a data block
1981 * out of the socket input buffer, and discards it.
1982 */
1983static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1984{
1985        struct page *page;
1986        int err = 0;
1987        void *data;
1988
1989        if (!data_size)
1990                return 0;
1991
1992        page = drbd_alloc_pages(peer_device, 1, 1);
1993
1994        data = kmap(page);
1995        while (data_size) {
1996                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1997
1998                err = drbd_recv_all_warn(peer_device->connection, data, len);
1999                if (err)
2000                        break;
2001                data_size -= len;
2002        }
2003        kunmap(page);
2004        drbd_free_pages(peer_device->device, page, 0);
2005        return err;
2006}
2007
2008static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2009                           sector_t sector, int data_size)
2010{
2011        struct bio_vec bvec;
2012        struct bvec_iter iter;
2013        struct bio *bio;
2014        int digest_size, err, expect;
2015        void *dig_in = peer_device->connection->int_dig_in;
2016        void *dig_vv = peer_device->connection->int_dig_vv;
2017
2018        digest_size = 0;
2019        if (peer_device->connection->peer_integrity_tfm) {
2020                digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2021                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2022                if (err)
2023                        return err;
2024                data_size -= digest_size;
2025        }
2026
2027        /* optimistically update recv_cnt.  if receiving fails below,
2028         * we disconnect anyways, and counters will be reset. */
2029        peer_device->device->recv_cnt += data_size>>9;
2030
2031        bio = req->master_bio;
2032        D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2033
2034        bio_for_each_segment(bvec, bio, iter) {
2035                void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2036                expect = min_t(int, data_size, bvec.bv_len);
2037                err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2038                kunmap(bvec.bv_page);
2039                if (err)
2040                        return err;
2041                data_size -= expect;
2042        }
2043
2044        if (digest_size) {
2045                drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2046                if (memcmp(dig_in, dig_vv, digest_size)) {
2047                        drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2048                        return -EINVAL;
2049                }
2050        }
2051
2052        D_ASSERT(peer_device->device, data_size == 0);
2053        return 0;
2054}
2055
2056/*
2057 * e_end_resync_block() is called in ack_sender context via
2058 * drbd_finish_peer_reqs().
2059 */
2060static int e_end_resync_block(struct drbd_work *w, int unused)
2061{
2062        struct drbd_peer_request *peer_req =
2063                container_of(w, struct drbd_peer_request, w);
2064        struct drbd_peer_device *peer_device = peer_req->peer_device;
2065        struct drbd_device *device = peer_device->device;
2066        sector_t sector = peer_req->i.sector;
2067        int err;
2068
2069        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2070
2071        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2072                drbd_set_in_sync(device, sector, peer_req->i.size);
2073                err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2074        } else {
2075                /* Record failure to sync */
2076                drbd_rs_failed_io(device, sector, peer_req->i.size);
2077
2078                err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2079        }
2080        dec_unacked(device);
2081
2082        return err;
2083}
2084
2085static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2086                            struct packet_info *pi) __releases(local)
2087{
2088        struct drbd_device *device = peer_device->device;
2089        struct drbd_peer_request *peer_req;
2090
2091        peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2092        if (!peer_req)
2093                goto fail;
2094
2095        dec_rs_pending(device);
2096
2097        inc_unacked(device);
2098        /* corresponding dec_unacked() in e_end_resync_block()
2099         * respective _drbd_clear_done_ee */
2100
2101        peer_req->w.cb = e_end_resync_block;
2102        peer_req->submit_jif = jiffies;
2103
2104        spin_lock_irq(&device->resource->req_lock);
2105        list_add_tail(&peer_req->w.list, &device->sync_ee);
2106        spin_unlock_irq(&device->resource->req_lock);
2107
2108        atomic_add(pi->size >> 9, &device->rs_sect_ev);
2109        if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2110                                     DRBD_FAULT_RS_WR) == 0)
2111                return 0;
2112
2113        /* don't care for the reason here */
2114        drbd_err(device, "submit failed, triggering re-connect\n");
2115        spin_lock_irq(&device->resource->req_lock);
2116        list_del(&peer_req->w.list);
2117        spin_unlock_irq(&device->resource->req_lock);
2118
2119        drbd_free_peer_req(device, peer_req);
2120fail:
2121        put_ldev(device);
2122        return -EIO;
2123}
2124
2125static struct drbd_request *
2126find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2127             sector_t sector, bool missing_ok, const char *func)
2128{
2129        struct drbd_request *req;
2130
2131        /* Request object according to our peer */
2132        req = (struct drbd_request *)(unsigned long)id;
2133        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2134                return req;
2135        if (!missing_ok) {
2136                drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2137                        (unsigned long)id, (unsigned long long)sector);
2138        }
2139        return NULL;
2140}
2141
2142static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2143{
2144        struct drbd_peer_device *peer_device;
2145        struct drbd_device *device;
2146        struct drbd_request *req;
2147        sector_t sector;
2148        int err;
2149        struct p_data *p = pi->data;
2150
2151        peer_device = conn_peer_device(connection, pi->vnr);
2152        if (!peer_device)
2153                return -EIO;
2154        device = peer_device->device;
2155
2156        sector = be64_to_cpu(p->sector);
2157
2158        spin_lock_irq(&device->resource->req_lock);
2159        req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2160        spin_unlock_irq(&device->resource->req_lock);
2161        if (unlikely(!req))
2162                return -EIO;
2163
2164        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2165         * special casing it there for the various failure cases.
2166         * still no race with drbd_fail_pending_reads */
2167        err = recv_dless_read(peer_device, req, sector, pi->size);
2168        if (!err)
2169                req_mod(req, DATA_RECEIVED);
2170        /* else: nothing. handled from drbd_disconnect...
2171         * I don't think we may complete this just yet
2172         * in case we are "on-disconnect: freeze" */
2173
2174        return err;
2175}
2176
2177static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2178{
2179        struct drbd_peer_device *peer_device;
2180        struct drbd_device *device;
2181        sector_t sector;
2182        int err;
2183        struct p_data *p = pi->data;
2184
2185        peer_device = conn_peer_device(connection, pi->vnr);
2186        if (!peer_device)
2187                return -EIO;
2188        device = peer_device->device;
2189
2190        sector = be64_to_cpu(p->sector);
2191        D_ASSERT(device, p->block_id == ID_SYNCER);
2192
2193        if (get_ldev(device)) {
2194                /* data is submitted to disk within recv_resync_read.
2195                 * corresponding put_ldev done below on error,
2196                 * or in drbd_peer_request_endio. */
2197                err = recv_resync_read(peer_device, sector, pi);
2198        } else {
2199                if (__ratelimit(&drbd_ratelimit_state))
2200                        drbd_err(device, "Can not write resync data to local disk.\n");
2201
2202                err = drbd_drain_block(peer_device, pi->size);
2203
2204                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2205        }
2206
2207        atomic_add(pi->size >> 9, &device->rs_sect_in);
2208
2209        return err;
2210}
2211
2212static void restart_conflicting_writes(struct drbd_device *device,
2213                                       sector_t sector, int size)
2214{
2215        struct drbd_interval *i;
2216        struct drbd_request *req;
2217
2218        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2219                if (!i->local)
2220                        continue;
2221                req = container_of(i, struct drbd_request, i);
2222                if (req->rq_state & RQ_LOCAL_PENDING ||
2223                    !(req->rq_state & RQ_POSTPONED))
2224                        continue;
2225                /* as it is RQ_POSTPONED, this will cause it to
2226                 * be queued on the retry workqueue. */
2227                __req_mod(req, CONFLICT_RESOLVED, NULL);
2228        }
2229}
2230
2231/*
2232 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2233 */
2234static int e_end_block(struct drbd_work *w, int cancel)
2235{
2236        struct drbd_peer_request *peer_req =
2237                container_of(w, struct drbd_peer_request, w);
2238        struct drbd_peer_device *peer_device = peer_req->peer_device;
2239        struct drbd_device *device = peer_device->device;
2240        sector_t sector = peer_req->i.sector;
2241        int err = 0, pcmd;
2242
2243        if (peer_req->flags & EE_SEND_WRITE_ACK) {
2244                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2245                        pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2246                                device->state.conn <= C_PAUSED_SYNC_T &&
2247                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2248                                P_RS_WRITE_ACK : P_WRITE_ACK;
2249                        err = drbd_send_ack(peer_device, pcmd, peer_req);
2250                        if (pcmd == P_RS_WRITE_ACK)
2251                                drbd_set_in_sync(device, sector, peer_req->i.size);
2252                } else {
2253                        err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2254                        /* we expect it to be marked out of sync anyways...
2255                         * maybe assert this?  */
2256                }
2257                dec_unacked(device);
2258        }
2259
2260        /* we delete from the conflict detection hash _after_ we sent out the
2261         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2262        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2263                spin_lock_irq(&device->resource->req_lock);
2264                D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2265                drbd_remove_epoch_entry_interval(device, peer_req);
2266                if (peer_req->flags & EE_RESTART_REQUESTS)
2267                        restart_conflicting_writes(device, sector, peer_req->i.size);
2268                spin_unlock_irq(&device->resource->req_lock);
2269        } else
2270                D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2271
2272        drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2273
2274        return err;
2275}
2276
2277static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2278{
2279        struct drbd_peer_request *peer_req =
2280                container_of(w, struct drbd_peer_request, w);
2281        struct drbd_peer_device *peer_device = peer_req->peer_device;
2282        int err;
2283
2284        err = drbd_send_ack(peer_device, ack, peer_req);
2285        dec_unacked(peer_device->device);
2286
2287        return err;
2288}
2289
2290static int e_send_superseded(struct drbd_work *w, int unused)
2291{
2292        return e_send_ack(w, P_SUPERSEDED);
2293}
2294
2295static int e_send_retry_write(struct drbd_work *w, int unused)
2296{
2297        struct drbd_peer_request *peer_req =
2298                container_of(w, struct drbd_peer_request, w);
2299        struct drbd_connection *connection = peer_req->peer_device->connection;
2300
2301        return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2302                             P_RETRY_WRITE : P_SUPERSEDED);
2303}
2304
2305static bool seq_greater(u32 a, u32 b)
2306{
2307        /*
2308         * We assume 32-bit wrap-around here.
2309         * For 24-bit wrap-around, we would have to shift:
2310         *  a <<= 8; b <<= 8;
2311         */
2312        return (s32)a - (s32)b > 0;
2313}
2314
2315static u32 seq_max(u32 a, u32 b)
2316{
2317        return seq_greater(a, b) ? a : b;
2318}
2319
2320static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2321{
2322        struct drbd_device *device = peer_device->device;
2323        unsigned int newest_peer_seq;
2324
2325        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2326                spin_lock(&device->peer_seq_lock);
2327                newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2328                device->peer_seq = newest_peer_seq;
2329                spin_unlock(&device->peer_seq_lock);
2330                /* wake up only if we actually changed device->peer_seq */
2331                if (peer_seq == newest_peer_seq)
2332                        wake_up(&device->seq_wait);
2333        }
2334}
2335
2336static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2337{
2338        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2339}
2340
2341/* maybe change sync_ee into interval trees as well? */
2342static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2343{
2344        struct drbd_peer_request *rs_req;
2345        bool rv = false;
2346
2347        spin_lock_irq(&device->resource->req_lock);
2348        list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2349                if (overlaps(peer_req->i.sector, peer_req->i.size,
2350                             rs_req->i.sector, rs_req->i.size)) {
2351                        rv = true;
2352                        break;
2353                }
2354        }
2355        spin_unlock_irq(&device->resource->req_lock);
2356
2357        return rv;
2358}
2359
2360/* Called from receive_Data.
2361 * Synchronize packets on sock with packets on msock.
2362 *
2363 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2364 * packet traveling on msock, they are still processed in the order they have
2365 * been sent.
2366 *
2367 * Note: we don't care for Ack packets overtaking P_DATA packets.
2368 *
2369 * In case packet_seq is larger than device->peer_seq number, there are
2370 * outstanding packets on the msock. We wait for them to arrive.
2371 * In case we are the logically next packet, we update device->peer_seq
2372 * ourselves. Correctly handles 32bit wrap around.
2373 *
2374 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2375 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2376 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2377 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2378 *
2379 * returns 0 if we may process the packet,
2380 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2381static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2382{
2383        struct drbd_device *device = peer_device->device;
2384        DEFINE_WAIT(wait);
2385        long timeout;
2386        int ret = 0, tp;
2387
2388        if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2389                return 0;
2390
2391        spin_lock(&device->peer_seq_lock);
2392        for (;;) {
2393                if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2394                        device->peer_seq = seq_max(device->peer_seq, peer_seq);
2395                        break;
2396                }
2397
2398                if (signal_pending(current)) {
2399                        ret = -ERESTARTSYS;
2400                        break;
2401                }
2402
2403                rcu_read_lock();
2404                tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2405                rcu_read_unlock();
2406
2407                if (!tp)
2408                        break;
2409
2410                /* Only need to wait if two_primaries is enabled */
2411                prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2412                spin_unlock(&device->peer_seq_lock);
2413                rcu_read_lock();
2414                timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2415                rcu_read_unlock();
2416                timeout = schedule_timeout(timeout);
2417                spin_lock(&device->peer_seq_lock);
2418                if (!timeout) {
2419                        ret = -ETIMEDOUT;
2420                        drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2421                        break;
2422                }
2423        }
2424        spin_unlock(&device->peer_seq_lock);
2425        finish_wait(&device->seq_wait, &wait);
2426        return ret;
2427}
2428
2429/* see also bio_flags_to_wire()
2430 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2431 * flags and back. We may replicate to other kernel versions. */
2432static unsigned long wire_flags_to_bio_flags(u32 dpf)
2433{
2434        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2435                (dpf & DP_FUA ? REQ_FUA : 0) |
2436                (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2437}
2438
2439static unsigned long wire_flags_to_bio_op(u32 dpf)
2440{
2441        if (dpf & DP_ZEROES)
2442                return REQ_OP_WRITE_ZEROES;
2443        if (dpf & DP_DISCARD)
2444                return REQ_OP_DISCARD;
2445        if (dpf & DP_WSAME)
2446                return REQ_OP_WRITE_SAME;
2447        else
2448                return REQ_OP_WRITE;
2449}
2450
2451static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2452                                    unsigned int size)
2453{
2454        struct drbd_interval *i;
2455
2456    repeat:
2457        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2458                struct drbd_request *req;
2459                struct bio_and_error m;
2460
2461                if (!i->local)
2462                        continue;
2463                req = container_of(i, struct drbd_request, i);
2464                if (!(req->rq_state & RQ_POSTPONED))
2465                        continue;
2466                req->rq_state &= ~RQ_POSTPONED;
2467                __req_mod(req, NEG_ACKED, &m);
2468                spin_unlock_irq(&device->resource->req_lock);
2469                if (m.bio)
2470                        complete_master_bio(device, &m);
2471                spin_lock_irq(&device->resource->req_lock);
2472                goto repeat;
2473        }
2474}
2475
2476static int handle_write_conflicts(struct drbd_device *device,
2477                                  struct drbd_peer_request *peer_req)
2478{
2479        struct drbd_connection *connection = peer_req->peer_device->connection;
2480        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2481        sector_t sector = peer_req->i.sector;
2482        const unsigned int size = peer_req->i.size;
2483        struct drbd_interval *i;
2484        bool equal;
2485        int err;
2486
2487        /*
2488         * Inserting the peer request into the write_requests tree will prevent
2489         * new conflicting local requests from being added.
2490         */
2491        drbd_insert_interval(&device->write_requests, &peer_req->i);
2492
2493    repeat:
2494        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2495                if (i == &peer_req->i)
2496                        continue;
2497                if (i->completed)
2498                        continue;
2499
2500                if (!i->local) {
2501                        /*
2502                         * Our peer has sent a conflicting remote request; this
2503                         * should not happen in a two-node setup.  Wait for the
2504                         * earlier peer request to complete.
2505                         */
2506                        err = drbd_wait_misc(device, i);
2507                        if (err)
2508                                goto out;
2509                        goto repeat;
2510                }
2511
2512                equal = i->sector == sector && i->size == size;
2513                if (resolve_conflicts) {
2514                        /*
2515                         * If the peer request is fully contained within the
2516                         * overlapping request, it can be considered overwritten
2517                         * and thus superseded; otherwise, it will be retried
2518                         * once all overlapping requests have completed.
2519                         */
2520                        bool superseded = i->sector <= sector && i->sector +
2521                                       (i->size >> 9) >= sector + (size >> 9);
2522
2523                        if (!equal)
2524                                drbd_alert(device, "Concurrent writes detected: "
2525                                               "local=%llus +%u, remote=%llus +%u, "
2526                                               "assuming %s came first\n",
2527                                          (unsigned long long)i->sector, i->size,
2528                                          (unsigned long long)sector, size,
2529                                          superseded ? "local" : "remote");
2530
2531                        peer_req->w.cb = superseded ? e_send_superseded :
2532                                                   e_send_retry_write;
2533                        list_add_tail(&peer_req->w.list, &device->done_ee);
2534                        queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2535
2536                        err = -ENOENT;
2537                        goto out;
2538                } else {
2539                        struct drbd_request *req =
2540                                container_of(i, struct drbd_request, i);
2541
2542                        if (!equal)
2543                                drbd_alert(device, "Concurrent writes detected: "
2544                                               "local=%llus +%u, remote=%llus +%u\n",
2545                                          (unsigned long long)i->sector, i->size,
2546                                          (unsigned long long)sector, size);
2547
2548                        if (req->rq_state & RQ_LOCAL_PENDING ||
2549                            !(req->rq_state & RQ_POSTPONED)) {
2550                                /*
2551                                 * Wait for the node with the discard flag to
2552                                 * decide if this request has been superseded
2553                                 * or needs to be retried.
2554                                 * Requests that have been superseded will
2555                                 * disappear from the write_requests tree.
2556                                 *
2557                                 * In addition, wait for the conflicting
2558                                 * request to finish locally before submitting
2559                                 * the conflicting peer request.
2560                                 */
2561                                err = drbd_wait_misc(device, &req->i);
2562                                if (err) {
2563                                        _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2564                                        fail_postponed_requests(device, sector, size);
2565                                        goto out;
2566                                }
2567                                goto repeat;
2568                        }
2569                        /*
2570                         * Remember to restart the conflicting requests after
2571                         * the new peer request has completed.
2572                         */
2573                        peer_req->flags |= EE_RESTART_REQUESTS;
2574                }
2575        }
2576        err = 0;
2577
2578    out:
2579        if (err)
2580                drbd_remove_epoch_entry_interval(device, peer_req);
2581        return err;
2582}
2583
2584/* mirrored write */
2585static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2586{
2587        struct drbd_peer_device *peer_device;
2588        struct drbd_device *device;
2589        struct net_conf *nc;
2590        sector_t sector;
2591        struct drbd_peer_request *peer_req;
2592        struct p_data *p = pi->data;
2593        u32 peer_seq = be32_to_cpu(p->seq_num);
2594        int op, op_flags;
2595        u32 dp_flags;
2596        int err, tp;
2597
2598        peer_device = conn_peer_device(connection, pi->vnr);
2599        if (!peer_device)
2600                return -EIO;
2601        device = peer_device->device;
2602
2603        if (!get_ldev(device)) {
2604                int err2;
2605
2606                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2607                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2608                atomic_inc(&connection->current_epoch->epoch_size);
2609                err2 = drbd_drain_block(peer_device, pi->size);
2610                if (!err)
2611                        err = err2;
2612                return err;
2613        }
2614
2615        /*
2616         * Corresponding put_ldev done either below (on various errors), or in
2617         * drbd_peer_request_endio, if we successfully submit the data at the
2618         * end of this function.
2619         */
2620
2621        sector = be64_to_cpu(p->sector);
2622        peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2623        if (!peer_req) {
2624                put_ldev(device);
2625                return -EIO;
2626        }
2627
2628        peer_req->w.cb = e_end_block;
2629        peer_req->submit_jif = jiffies;
2630        peer_req->flags |= EE_APPLICATION;
2631
2632        dp_flags = be32_to_cpu(p->dp_flags);
2633        op = wire_flags_to_bio_op(dp_flags);
2634        op_flags = wire_flags_to_bio_flags(dp_flags);
2635        if (pi->cmd == P_TRIM) {
2636                D_ASSERT(peer_device, peer_req->i.size > 0);
2637                D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2638                D_ASSERT(peer_device, peer_req->pages == NULL);
2639                /* need to play safe: an older DRBD sender
2640                 * may mean zero-out while sending P_TRIM. */
2641                if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2642                        peer_req->flags |= EE_ZEROOUT;
2643        } else if (pi->cmd == P_ZEROES) {
2644                D_ASSERT(peer_device, peer_req->i.size > 0);
2645                D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2646                D_ASSERT(peer_device, peer_req->pages == NULL);
2647                /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2648                if (dp_flags & DP_DISCARD)
2649                        peer_req->flags |= EE_TRIM;
2650        } else if (peer_req->pages == NULL) {
2651                D_ASSERT(device, peer_req->i.size == 0);
2652                D_ASSERT(device, dp_flags & DP_FLUSH);
2653        }
2654
2655        if (dp_flags & DP_MAY_SET_IN_SYNC)
2656                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2657
2658        spin_lock(&connection->epoch_lock);
2659        peer_req->epoch = connection->current_epoch;
2660        atomic_inc(&peer_req->epoch->epoch_size);
2661        atomic_inc(&peer_req->epoch->active);
2662        spin_unlock(&connection->epoch_lock);
2663
2664        rcu_read_lock();
2665        nc = rcu_dereference(peer_device->connection->net_conf);
2666        tp = nc->two_primaries;
2667        if (peer_device->connection->agreed_pro_version < 100) {
2668                switch (nc->wire_protocol) {
2669                case DRBD_PROT_C:
2670                        dp_flags |= DP_SEND_WRITE_ACK;
2671                        break;
2672                case DRBD_PROT_B:
2673                        dp_flags |= DP_SEND_RECEIVE_ACK;
2674                        break;
2675                }
2676        }
2677        rcu_read_unlock();
2678
2679        if (dp_flags & DP_SEND_WRITE_ACK) {
2680                peer_req->flags |= EE_SEND_WRITE_ACK;
2681                inc_unacked(device);
2682                /* corresponding dec_unacked() in e_end_block()
2683                 * respective _drbd_clear_done_ee */
2684        }
2685
2686        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2687                /* I really don't like it that the receiver thread
2688                 * sends on the msock, but anyways */
2689                drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2690        }
2691
2692        if (tp) {
2693                /* two primaries implies protocol C */
2694                D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2695                peer_req->flags |= EE_IN_INTERVAL_TREE;
2696                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2697                if (err)
2698                        goto out_interrupted;
2699                spin_lock_irq(&device->resource->req_lock);
2700                err = handle_write_conflicts(device, peer_req);
2701                if (err) {
2702                        spin_unlock_irq(&device->resource->req_lock);
2703                        if (err == -ENOENT) {
2704                                put_ldev(device);
2705                                return 0;
2706                        }
2707                        goto out_interrupted;
2708                }
2709        } else {
2710                update_peer_seq(peer_device, peer_seq);
2711                spin_lock_irq(&device->resource->req_lock);
2712        }
2713        /* TRIM and WRITE_SAME are processed synchronously,
2714         * we wait for all pending requests, respectively wait for
2715         * active_ee to become empty in drbd_submit_peer_request();
2716         * better not add ourselves here. */
2717        if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2718                list_add_tail(&peer_req->w.list, &device->active_ee);
2719        spin_unlock_irq(&device->resource->req_lock);
2720
2721        if (device->state.conn == C_SYNC_TARGET)
2722                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2723
2724        if (device->state.pdsk < D_INCONSISTENT) {
2725                /* In case we have the only disk of the cluster, */
2726                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2727                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2728                drbd_al_begin_io(device, &peer_req->i);
2729                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2730        }
2731
2732        err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2733                                       DRBD_FAULT_DT_WR);
2734        if (!err)
2735                return 0;
2736
2737        /* don't care for the reason here */
2738        drbd_err(device, "submit failed, triggering re-connect\n");
2739        spin_lock_irq(&device->resource->req_lock);
2740        list_del(&peer_req->w.list);
2741        drbd_remove_epoch_entry_interval(device, peer_req);
2742        spin_unlock_irq(&device->resource->req_lock);
2743        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2744                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2745                drbd_al_complete_io(device, &peer_req->i);
2746        }
2747
2748out_interrupted:
2749        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2750        put_ldev(device);
2751        drbd_free_peer_req(device, peer_req);
2752        return err;
2753}
2754
2755/* We may throttle resync, if the lower device seems to be busy,
2756 * and current sync rate is above c_min_rate.
2757 *
2758 * To decide whether or not the lower device is busy, we use a scheme similar
2759 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2760 * (more than 64 sectors) of activity we cannot account for with our own resync
2761 * activity, it obviously is "busy".
2762 *
2763 * The current sync rate used here uses only the most recent two step marks,
2764 * to have a short time average so we can react faster.
2765 */
2766bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2767                bool throttle_if_app_is_waiting)
2768{
2769        struct lc_element *tmp;
2770        bool throttle = drbd_rs_c_min_rate_throttle(device);
2771
2772        if (!throttle || throttle_if_app_is_waiting)
2773                return throttle;
2774
2775        spin_lock_irq(&device->al_lock);
2776        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2777        if (tmp) {
2778                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2779                if (test_bit(BME_PRIORITY, &bm_ext->flags))
2780                        throttle = false;
2781                /* Do not slow down if app IO is already waiting for this extent,
2782                 * and our progress is necessary for application IO to complete. */
2783        }
2784        spin_unlock_irq(&device->al_lock);
2785
2786        return throttle;
2787}
2788
2789bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2790{
2791        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2792        unsigned long db, dt, dbdt;
2793        unsigned int c_min_rate;
2794        int curr_events;
2795
2796        rcu_read_lock();
2797        c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2798        rcu_read_unlock();
2799
2800        /* feature disabled? */
2801        if (c_min_rate == 0)
2802                return false;
2803
2804        curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2805                        atomic_read(&device->rs_sect_ev);
2806
2807        if (atomic_read(&device->ap_actlog_cnt)
2808            || curr_events - device->rs_last_events > 64) {
2809                unsigned long rs_left;
2810                int i;
2811
2812                device->rs_last_events = curr_events;
2813
2814                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2815                 * approx. */
2816                i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2817
2818                if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2819                        rs_left = device->ov_left;
2820                else
2821                        rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2822
2823                dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2824                if (!dt)
2825                        dt++;
2826                db = device->rs_mark_left[i] - rs_left;
2827                dbdt = Bit2KB(db/dt);
2828
2829                if (dbdt > c_min_rate)
2830                        return true;
2831        }
2832        return false;
2833}
2834
2835static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2836{
2837        struct drbd_peer_device *peer_device;
2838        struct drbd_device *device;
2839        sector_t sector;
2840        sector_t capacity;
2841        struct drbd_peer_request *peer_req;
2842        struct digest_info *di = NULL;
2843        int size, verb;
2844        unsigned int fault_type;
2845        struct p_block_req *p = pi->data;
2846
2847        peer_device = conn_peer_device(connection, pi->vnr);
2848        if (!peer_device)
2849                return -EIO;
2850        device = peer_device->device;
2851        capacity = drbd_get_capacity(device->this_bdev);
2852
2853        sector = be64_to_cpu(p->sector);
2854        size   = be32_to_cpu(p->blksize);
2855
2856        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2857                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2858                                (unsigned long long)sector, size);
2859                return -EINVAL;
2860        }
2861        if (sector + (size>>9) > capacity) {
2862                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2863                                (unsigned long long)sector, size);
2864                return -EINVAL;
2865        }
2866
2867        if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2868                verb = 1;
2869                switch (pi->cmd) {
2870                case P_DATA_REQUEST:
2871                        drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2872                        break;
2873                case P_RS_THIN_REQ:
2874                case P_RS_DATA_REQUEST:
2875                case P_CSUM_RS_REQUEST:
2876                case P_OV_REQUEST:
2877                        drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2878                        break;
2879                case P_OV_REPLY:
2880                        verb = 0;
2881                        dec_rs_pending(device);
2882                        drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2883                        break;
2884                default:
2885                        BUG();
2886                }
2887                if (verb && __ratelimit(&drbd_ratelimit_state))
2888                        drbd_err(device, "Can not satisfy peer's read request, "
2889                            "no local data.\n");
2890
2891                /* drain possibly payload */
2892                return drbd_drain_block(peer_device, pi->size);
2893        }
2894
2895        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2896         * "criss-cross" setup, that might cause write-out on some other DRBD,
2897         * which in turn might block on the other node at this very place.  */
2898        peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2899                        size, GFP_NOIO);
2900        if (!peer_req) {
2901                put_ldev(device);
2902                return -ENOMEM;
2903        }
2904
2905        switch (pi->cmd) {
2906        case P_DATA_REQUEST:
2907                peer_req->w.cb = w_e_end_data_req;
2908                fault_type = DRBD_FAULT_DT_RD;
2909                /* application IO, don't drbd_rs_begin_io */
2910                peer_req->flags |= EE_APPLICATION;
2911                goto submit;
2912
2913        case P_RS_THIN_REQ:
2914                /* If at some point in the future we have a smart way to
2915                   find out if this data block is completely deallocated,
2916                   then we would do something smarter here than reading
2917                   the block... */
2918                peer_req->flags |= EE_RS_THIN_REQ;
2919                /* fall through */
2920        case P_RS_DATA_REQUEST:
2921                peer_req->w.cb = w_e_end_rsdata_req;
2922                fault_type = DRBD_FAULT_RS_RD;
2923                /* used in the sector offset progress display */
2924                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2925                break;
2926
2927        case P_OV_REPLY:
2928        case P_CSUM_RS_REQUEST:
2929                fault_type = DRBD_FAULT_RS_RD;
2930                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2931                if (!di)
2932                        goto out_free_e;
2933
2934                di->digest_size = pi->size;
2935                di->digest = (((char *)di)+sizeof(struct digest_info));
2936
2937                peer_req->digest = di;
2938                peer_req->flags |= EE_HAS_DIGEST;
2939
2940                if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2941                        goto out_free_e;
2942
2943                if (pi->cmd == P_CSUM_RS_REQUEST) {
2944                        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2945                        peer_req->w.cb = w_e_end_csum_rs_req;
2946                        /* used in the sector offset progress display */
2947                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2948                        /* remember to report stats in drbd_resync_finished */
2949                        device->use_csums = true;
2950                } else if (pi->cmd == P_OV_REPLY) {
2951                        /* track progress, we may need to throttle */
2952                        atomic_add(size >> 9, &device->rs_sect_in);
2953                        peer_req->w.cb = w_e_end_ov_reply;
2954                        dec_rs_pending(device);
2955                        /* drbd_rs_begin_io done when we sent this request,
2956                         * but accounting still needs to be done. */
2957                        goto submit_for_resync;
2958                }
2959                break;
2960
2961        case P_OV_REQUEST:
2962                if (device->ov_start_sector == ~(sector_t)0 &&
2963                    peer_device->connection->agreed_pro_version >= 90) {
2964                        unsigned long now = jiffies;
2965                        int i;
2966                        device->ov_start_sector = sector;
2967                        device->ov_position = sector;
2968                        device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2969                        device->rs_total = device->ov_left;
2970                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2971                                device->rs_mark_left[i] = device->ov_left;
2972                                device->rs_mark_time[i] = now;
2973                        }
2974                        drbd_info(device, "Online Verify start sector: %llu\n",
2975                                        (unsigned long long)sector);
2976                }
2977                peer_req->w.cb = w_e_end_ov_req;
2978                fault_type = DRBD_FAULT_RS_RD;
2979                break;
2980
2981        default:
2982                BUG();
2983        }
2984
2985        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2986         * wrt the receiver, but it is not as straightforward as it may seem.
2987         * Various places in the resync start and stop logic assume resync
2988         * requests are processed in order, requeuing this on the worker thread
2989         * introduces a bunch of new code for synchronization between threads.
2990         *
2991         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2992         * "forever", throttling after drbd_rs_begin_io will lock that extent
2993         * for application writes for the same time.  For now, just throttle
2994         * here, where the rest of the code expects the receiver to sleep for
2995         * a while, anyways.
2996         */
2997
2998        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2999         * this defers syncer requests for some time, before letting at least
3000         * on request through.  The resync controller on the receiving side
3001         * will adapt to the incoming rate accordingly.
3002         *
3003         * We cannot throttle here if remote is Primary/SyncTarget:
3004         * we would also throttle its application reads.
3005         * In that case, throttling is done on the SyncTarget only.
3006         */
3007
3008        /* Even though this may be a resync request, we do add to "read_ee";
3009         * "sync_ee" is only used for resync WRITEs.
3010         * Add to list early, so debugfs can find this request
3011         * even if we have to sleep below. */
3012        spin_lock_irq(&device->resource->req_lock);
3013        list_add_tail(&peer_req->w.list, &device->read_ee);
3014        spin_unlock_irq(&device->resource->req_lock);
3015
3016        update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3017        if (device->state.peer != R_PRIMARY
3018        && drbd_rs_should_slow_down(device, sector, false))
3019                schedule_timeout_uninterruptible(HZ/10);
3020        update_receiver_timing_details(connection, drbd_rs_begin_io);
3021        if (drbd_rs_begin_io(device, sector))
3022                goto out_free_e;
3023
3024submit_for_resync:
3025        atomic_add(size >> 9, &device->rs_sect_ev);
3026
3027submit:
3028        update_receiver_timing_details(connection, drbd_submit_peer_request);
3029        inc_unacked(device);
3030        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3031                                     fault_type) == 0)
3032                return 0;
3033
3034        /* don't care for the reason here */
3035        drbd_err(device, "submit failed, triggering re-connect\n");
3036
3037out_free_e:
3038        spin_lock_irq(&device->resource->req_lock);
3039        list_del(&peer_req->w.list);
3040        spin_unlock_irq(&device->resource->req_lock);
3041        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3042
3043        put_ldev(device);
3044        drbd_free_peer_req(device, peer_req);
3045        return -EIO;
3046}
3047
3048/**
3049 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3050 */
3051static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3052{
3053        struct drbd_device *device = peer_device->device;
3054        int self, peer, rv = -100;
3055        unsigned long ch_self, ch_peer;
3056        enum drbd_after_sb_p after_sb_0p;
3057
3058        self = device->ldev->md.uuid[UI_BITMAP] & 1;
3059        peer = device->p_uuid[UI_BITMAP] & 1;
3060
3061        ch_peer = device->p_uuid[UI_SIZE];
3062        ch_self = device->comm_bm_set;
3063
3064        rcu_read_lock();
3065        after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3066        rcu_read_unlock();
3067        switch (after_sb_0p) {
3068        case ASB_CONSENSUS:
3069        case ASB_DISCARD_SECONDARY:
3070        case ASB_CALL_HELPER:
3071        case ASB_VIOLENTLY:
3072                drbd_err(device, "Configuration error.\n");
3073                break;
3074        case ASB_DISCONNECT:
3075                break;
3076        case ASB_DISCARD_YOUNGER_PRI:
3077                if (self == 0 && peer == 1) {
3078                        rv = -1;
3079                        break;
3080                }
3081                if (self == 1 && peer == 0) {
3082                        rv =  1;
3083                        break;
3084                }
3085                /* Else fall through - to one of the other strategies... */
3086        case ASB_DISCARD_OLDER_PRI:
3087                if (self == 0 && peer == 1) {
3088                        rv = 1;
3089                        break;
3090                }
3091                if (self == 1 && peer == 0) {
3092                        rv = -1;
3093                        break;
3094                }
3095                /* Else fall through to one of the other strategies... */
3096                drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3097                     "Using discard-least-changes instead\n");
3098                /* fall through */
3099        case ASB_DISCARD_ZERO_CHG:
3100                if (ch_peer == 0 && ch_self == 0) {
3101                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3102                                ? -1 : 1;
3103                        break;
3104                } else {
3105                        if (ch_peer == 0) { rv =  1; break; }
3106                        if (ch_self == 0) { rv = -1; break; }
3107                }
3108                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3109                        break;
3110                /* else, fall through */
3111        case ASB_DISCARD_LEAST_CHG:
3112                if      (ch_self < ch_peer)
3113                        rv = -1;
3114                else if (ch_self > ch_peer)
3115                        rv =  1;
3116                else /* ( ch_self == ch_peer ) */
3117                     /* Well, then use something else. */
3118                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3119                                ? -1 : 1;
3120                break;
3121        case ASB_DISCARD_LOCAL:
3122                rv = -1;
3123                break;
3124        case ASB_DISCARD_REMOTE:
3125                rv =  1;
3126        }
3127
3128        return rv;
3129}
3130
3131/**
3132 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3133 */
3134static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3135{
3136        struct drbd_device *device = peer_device->device;
3137        int hg, rv = -100;
3138        enum drbd_after_sb_p after_sb_1p;
3139
3140        rcu_read_lock();
3141        after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3142        rcu_read_unlock();
3143        switch (after_sb_1p) {
3144        case ASB_DISCARD_YOUNGER_PRI:
3145        case ASB_DISCARD_OLDER_PRI:
3146        case ASB_DISCARD_LEAST_CHG:
3147        case ASB_DISCARD_LOCAL:
3148        case ASB_DISCARD_REMOTE:
3149        case ASB_DISCARD_ZERO_CHG:
3150                drbd_err(device, "Configuration error.\n");
3151                break;
3152        case ASB_DISCONNECT:
3153                break;
3154        case ASB_CONSENSUS:
3155                hg = drbd_asb_recover_0p(peer_device);
3156                if (hg == -1 && device->state.role == R_SECONDARY)
3157                        rv = hg;
3158                if (hg == 1  && device->state.role == R_PRIMARY)
3159                        rv = hg;
3160                break;
3161        case ASB_VIOLENTLY:
3162                rv = drbd_asb_recover_0p(peer_device);
3163                break;
3164        case ASB_DISCARD_SECONDARY:
3165                return device->state.role == R_PRIMARY ? 1 : -1;
3166        case ASB_CALL_HELPER:
3167                hg = drbd_asb_recover_0p(peer_device);
3168                if (hg == -1 && device->state.role == R_PRIMARY) {
3169                        enum drbd_state_rv rv2;
3170
3171                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3172                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3173                          * we do not need to wait for the after state change work either. */
3174                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3175                        if (rv2 != SS_SUCCESS) {
3176                                drbd_khelper(device, "pri-lost-after-sb");
3177                        } else {
3178                                drbd_warn(device, "Successfully gave up primary role.\n");
3179                                rv = hg;
3180                        }
3181                } else
3182                        rv = hg;
3183        }
3184
3185        return rv;
3186}
3187
3188/**
3189 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3190 */
3191static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3192{
3193        struct drbd_device *device = peer_device->device;
3194        int hg, rv = -100;
3195        enum drbd_after_sb_p after_sb_2p;
3196
3197        rcu_read_lock();
3198        after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3199        rcu_read_unlock();
3200        switch (after_sb_2p) {
3201        case ASB_DISCARD_YOUNGER_PRI:
3202        case ASB_DISCARD_OLDER_PRI:
3203        case ASB_DISCARD_LEAST_CHG:
3204        case ASB_DISCARD_LOCAL:
3205        case ASB_DISCARD_REMOTE:
3206        case ASB_CONSENSUS:
3207        case ASB_DISCARD_SECONDARY:
3208        case ASB_DISCARD_ZERO_CHG:
3209                drbd_err(device, "Configuration error.\n");
3210                break;
3211        case ASB_VIOLENTLY:
3212                rv = drbd_asb_recover_0p(peer_device);
3213                break;
3214        case ASB_DISCONNECT:
3215                break;
3216        case ASB_CALL_HELPER:
3217                hg = drbd_asb_recover_0p(peer_device);
3218                if (hg == -1) {
3219                        enum drbd_state_rv rv2;
3220
3221                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3222                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3223                          * we do not need to wait for the after state change work either. */
3224                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3225                        if (rv2 != SS_SUCCESS) {
3226                                drbd_khelper(device, "pri-lost-after-sb");
3227                        } else {
3228                                drbd_warn(device, "Successfully gave up primary role.\n");
3229                                rv = hg;
3230                        }
3231                } else
3232                        rv = hg;
3233        }
3234
3235        return rv;
3236}
3237
3238static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3239                           u64 bits, u64 flags)
3240{
3241        if (!uuid) {
3242                drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3243                return;
3244        }
3245        drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3246             text,
3247             (unsigned long long)uuid[UI_CURRENT],
3248             (unsigned long long)uuid[UI_BITMAP],
3249             (unsigned long long)uuid[UI_HISTORY_START],
3250             (unsigned long long)uuid[UI_HISTORY_END],
3251             (unsigned long long)bits,
3252             (unsigned long long)flags);
3253}
3254
3255/*
3256  100   after split brain try auto recover
3257    2   C_SYNC_SOURCE set BitMap
3258    1   C_SYNC_SOURCE use BitMap
3259    0   no Sync
3260   -1   C_SYNC_TARGET use BitMap
3261   -2   C_SYNC_TARGET set BitMap
3262 -100   after split brain, disconnect
3263-1000   unrelated data
3264-1091   requires proto 91
3265-1096   requires proto 96
3266 */
3267
3268static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3269{
3270        struct drbd_peer_device *const peer_device = first_peer_device(device);
3271        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3272        u64 self, peer;
3273        int i, j;
3274
3275        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3276        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3277
3278        *rule_nr = 10;
3279        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3280                return 0;
3281
3282        *rule_nr = 20;
3283        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3284             peer != UUID_JUST_CREATED)
3285                return -2;
3286
3287        *rule_nr = 30;
3288        if (self != UUID_JUST_CREATED &&
3289            (peer == UUID_JUST_CREATED || peer == (u64)0))
3290                return 2;
3291
3292        if (self == peer) {
3293                int rct, dc; /* roles at crash time */
3294
3295                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3296
3297                        if (connection->agreed_pro_version < 91)
3298                                return -1091;
3299
3300                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3301                            (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3302                                drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3303                                drbd_uuid_move_history(device);
3304                                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3305                                device->ldev->md.uuid[UI_BITMAP] = 0;
3306
3307                                drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3308                                               device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3309                                *rule_nr = 34;
3310                        } else {
3311                                drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3312                                *rule_nr = 36;
3313                        }
3314
3315                        return 1;
3316                }
3317
3318                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3319
3320                        if (connection->agreed_pro_version < 91)
3321                                return -1091;
3322
3323                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3324                            (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3325                                drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3326
3327                                device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3328                                device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3329                                device->p_uuid[UI_BITMAP] = 0UL;
3330
3331                                drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3332                                *rule_nr = 35;
3333                        } else {
3334                                drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3335                                *rule_nr = 37;
3336                        }
3337
3338                        return -1;
3339                }
3340
3341                /* Common power [off|failure] */
3342                rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3343                        (device->p_uuid[UI_FLAGS] & 2);
3344                /* lowest bit is set when we were primary,
3345                 * next bit (weight 2) is set when peer was primary */
3346                *rule_nr = 40;
3347
3348                /* Neither has the "crashed primary" flag set,
3349                 * only a replication link hickup. */
3350                if (rct == 0)
3351                        return 0;
3352
3353                /* Current UUID equal and no bitmap uuid; does not necessarily
3354                 * mean this was a "simultaneous hard crash", maybe IO was
3355                 * frozen, so no UUID-bump happened.
3356                 * This is a protocol change, overload DRBD_FF_WSAME as flag
3357                 * for "new-enough" peer DRBD version. */
3358                if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3359                        *rule_nr = 41;
3360                        if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3361                                drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3362                                return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3363                        }
3364                        if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3365                                /* At least one has the "crashed primary" bit set,
3366                                 * both are primary now, but neither has rotated its UUIDs?
3367                                 * "Can not happen." */
3368                                drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3369                                return -100;
3370                        }
3371                        if (device->state.role == R_PRIMARY)
3372                                return 1;
3373                        return -1;
3374                }
3375
3376                /* Both are secondary.
3377                 * Really looks like recovery from simultaneous hard crash.
3378                 * Check which had been primary before, and arbitrate. */
3379                switch (rct) {
3380                case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3381                case 1: /*  self_pri && !peer_pri */ return 1;
3382                case 2: /* !self_pri &&  peer_pri */ return -1;
3383                case 3: /*  self_pri &&  peer_pri */
3384                        dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3385                        return dc ? -1 : 1;
3386                }
3387        }
3388
3389        *rule_nr = 50;
3390        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3391        if (self == peer)
3392                return -1;
3393
3394        *rule_nr = 51;
3395        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3396        if (self == peer) {
3397                if (connection->agreed_pro_version < 96 ?
3398                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3399                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3400                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3401                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3402                           resync as sync source modifications of the peer's UUIDs. */
3403
3404                        if (connection->agreed_pro_version < 91)
3405                                return -1091;
3406
3407                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3408                        device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3409
3410                        drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3411                        drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3412
3413                        return -1;
3414                }
3415        }
3416
3417        *rule_nr = 60;
3418        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3419        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3420                peer = device->p_uuid[i] & ~((u64)1);
3421                if (self == peer)
3422                        return -2;
3423        }
3424
3425        *rule_nr = 70;
3426        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3427        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3428        if (self == peer)
3429                return 1;
3430
3431        *rule_nr = 71;
3432        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3433        if (self == peer) {
3434                if (connection->agreed_pro_version < 96 ?
3435                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3436                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3437                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3438                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3439                           resync as sync source modifications of our UUIDs. */
3440
3441                        if (connection->agreed_pro_version < 91)
3442                                return -1091;
3443
3444                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3445                        __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3446
3447                        drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3448                        drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3449                                       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3450
3451                        return 1;
3452                }
3453        }
3454
3455
3456        *rule_nr = 80;
3457        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3458        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3459                self = device->ldev->md.uuid[i] & ~((u64)1);
3460                if (self == peer)
3461                        return 2;
3462        }
3463
3464        *rule_nr = 90;
3465        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3466        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3467        if (self == peer && self != ((u64)0))
3468                return 100;
3469
3470        *rule_nr = 100;
3471        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3472                self = device->ldev->md.uuid[i] & ~((u64)1);
3473                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3474                        peer = device->p_uuid[j] & ~((u64)1);
3475                        if (self == peer)
3476                                return -100;
3477                }
3478        }
3479
3480        return -1000;
3481}
3482
3483/* drbd_sync_handshake() returns the new conn state on success, or
3484   CONN_MASK (-1) on failure.
3485 */
3486static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3487                                           enum drbd_role peer_role,
3488                                           enum drbd_disk_state peer_disk) __must_hold(local)
3489{
3490        struct drbd_device *device = peer_device->device;
3491        enum drbd_conns rv = C_MASK;
3492        enum drbd_disk_state mydisk;
3493        struct net_conf *nc;
3494        int hg, rule_nr, rr_conflict, tentative, always_asbp;
3495
3496        mydisk = device->state.disk;
3497        if (mydisk == D_NEGOTIATING)
3498                mydisk = device->new_state_tmp.disk;
3499
3500        drbd_info(device, "drbd_sync_handshake:\n");
3501
3502        spin_lock_irq(&device->ldev->md.uuid_lock);
3503        drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3504        drbd_uuid_dump(device, "peer", device->p_uuid,
3505                       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3506
3507        hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3508        spin_unlock_irq(&device->ldev->md.uuid_lock);
3509
3510        drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3511
3512        if (hg == -1000) {
3513                drbd_alert(device, "Unrelated data, aborting!\n");
3514                return C_MASK;
3515        }
3516        if (hg < -0x10000) {
3517                int proto, fflags;
3518                hg = -hg;
3519                proto = hg & 0xff;
3520                fflags = (hg >> 8) & 0xff;
3521                drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3522                                        proto, fflags);
3523                return C_MASK;
3524        }
3525        if (hg < -1000) {
3526                drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3527                return C_MASK;
3528        }
3529
3530        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3531            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3532                int f = (hg == -100) || abs(hg) == 2;
3533                hg = mydisk > D_INCONSISTENT ? 1 : -1;
3534                if (f)
3535                        hg = hg*2;
3536                drbd_info(device, "Becoming sync %s due to disk states.\n",
3537                     hg > 0 ? "source" : "target");
3538        }
3539
3540        if (abs(hg) == 100)
3541                drbd_khelper(device, "initial-split-brain");
3542
3543        rcu_read_lock();
3544        nc = rcu_dereference(peer_device->connection->net_conf);
3545        always_asbp = nc->always_asbp;
3546        rr_conflict = nc->rr_conflict;
3547        tentative = nc->tentative;
3548        rcu_read_unlock();
3549
3550        if (hg == 100 || (hg == -100 && always_asbp)) {
3551                int pcount = (device->state.role == R_PRIMARY)
3552                           + (peer_role == R_PRIMARY);
3553                int forced = (hg == -100);
3554
3555                switch (pcount) {
3556                case 0:
3557                        hg = drbd_asb_recover_0p(peer_device);
3558                        break;
3559                case 1:
3560                        hg = drbd_asb_recover_1p(peer_device);
3561                        break;
3562                case 2:
3563                        hg = drbd_asb_recover_2p(peer_device);
3564                        break;
3565                }
3566                if (abs(hg) < 100) {
3567                        drbd_warn(device, "Split-Brain detected, %d primaries, "
3568                             "automatically solved. Sync from %s node\n",
3569                             pcount, (hg < 0) ? "peer" : "this");
3570                        if (forced) {
3571                                drbd_warn(device, "Doing a full sync, since"
3572                                     " UUIDs where ambiguous.\n");
3573                                hg = hg*2;
3574                        }
3575                }
3576        }
3577
3578        if (hg == -100) {
3579                if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3580                        hg = -1;
3581                if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3582                        hg = 1;
3583
3584                if (abs(hg) < 100)
3585                        drbd_warn(device, "Split-Brain detected, manually solved. "
3586                             "Sync from %s node\n",
3587                             (hg < 0) ? "peer" : "this");
3588        }
3589
3590        if (hg == -100) {
3591                /* FIXME this log message is not correct if we end up here
3592                 * after an attempted attach on a diskless node.
3593                 * We just refuse to attach -- well, we drop the "connection"
3594                 * to that disk, in a way... */
3595                drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3596                drbd_khelper(device, "split-brain");
3597                return C_MASK;
3598        }
3599
3600        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3601                drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3602                return C_MASK;
3603        }
3604
3605        if (hg < 0 && /* by intention we do not use mydisk here. */
3606            device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3607                switch (rr_conflict) {
3608                case ASB_CALL_HELPER:
3609                        drbd_khelper(device, "pri-lost");
3610                        /* fall through */
3611                case ASB_DISCONNECT:
3612                        drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3613                        return C_MASK;
3614                case ASB_VIOLENTLY:
3615                        drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3616                             "assumption\n");
3617                }
3618        }
3619
3620        if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3621                if (hg == 0)
3622                        drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3623                else
3624                        drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3625                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3626                                 abs(hg) >= 2 ? "full" : "bit-map based");
3627                return C_MASK;
3628        }
3629
3630        if (abs(hg) >= 2) {
3631                drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3632                if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3633                                        BM_LOCKED_SET_ALLOWED))
3634                        return C_MASK;
3635        }
3636
3637        if (hg > 0) { /* become sync source. */
3638                rv = C_WF_BITMAP_S;
3639        } else if (hg < 0) { /* become sync target */
3640                rv = C_WF_BITMAP_T;
3641        } else {
3642                rv = C_CONNECTED;
3643                if (drbd_bm_total_weight(device)) {
3644                        drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3645                             drbd_bm_total_weight(device));
3646                }
3647        }
3648
3649        return rv;
3650}
3651
3652static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3653{
3654        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3655        if (peer == ASB_DISCARD_REMOTE)
3656                return ASB_DISCARD_LOCAL;
3657
3658        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3659        if (peer == ASB_DISCARD_LOCAL)
3660                return ASB_DISCARD_REMOTE;
3661
3662        /* everything else is valid if they are equal on both sides. */
3663        return peer;
3664}
3665
3666static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3667{
3668        struct p_protocol *p = pi->data;
3669        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3670        int p_proto, p_discard_my_data, p_two_primaries, cf;
3671        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3672        char integrity_alg[SHARED_SECRET_MAX] = "";
3673        struct crypto_shash *peer_integrity_tfm = NULL;
3674        void *int_dig_in = NULL, *int_dig_vv = NULL;
3675
3676        p_proto         = be32_to_cpu(p->protocol);
3677        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3678        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3679        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3680        p_two_primaries = be32_to_cpu(p->two_primaries);
3681        cf              = be32_to_cpu(p->conn_flags);
3682        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3683
3684        if (connection->agreed_pro_version >= 87) {
3685                int err;
3686
3687                if (pi->size > sizeof(integrity_alg))
3688                        return -EIO;
3689                err = drbd_recv_all(connection, integrity_alg, pi->size);
3690                if (err)
3691                        return err;
3692                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3693        }
3694
3695        if (pi->cmd != P_PROTOCOL_UPDATE) {
3696                clear_bit(CONN_DRY_RUN, &connection->flags);
3697
3698                if (cf & CF_DRY_RUN)
3699                        set_bit(CONN_DRY_RUN, &connection->flags);
3700
3701                rcu_read_lock();
3702                nc = rcu_dereference(connection->net_conf);
3703
3704                if (p_proto != nc->wire_protocol) {
3705                        drbd_err(connection, "incompatible %s settings\n", "protocol");
3706                        goto disconnect_rcu_unlock;
3707                }
3708
3709                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3710                        drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3711                        goto disconnect_rcu_unlock;
3712                }
3713
3714                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3715                        drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3716                        goto disconnect_rcu_unlock;
3717                }
3718
3719                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3720                        drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3721                        goto disconnect_rcu_unlock;
3722                }
3723
3724                if (p_discard_my_data && nc->discard_my_data) {
3725                        drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3726                        goto disconnect_rcu_unlock;
3727                }
3728
3729                if (p_two_primaries != nc->two_primaries) {
3730                        drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3731                        goto disconnect_rcu_unlock;
3732                }
3733
3734                if (strcmp(integrity_alg, nc->integrity_alg)) {
3735                        drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3736                        goto disconnect_rcu_unlock;
3737                }
3738
3739                rcu_read_unlock();
3740        }
3741
3742        if (integrity_alg[0]) {
3743                int hash_size;
3744
3745                /*
3746                 * We can only change the peer data integrity algorithm
3747                 * here.  Changing our own data integrity algorithm
3748                 * requires that we send a P_PROTOCOL_UPDATE packet at
3749                 * the same time; otherwise, the peer has no way to
3750                 * tell between which packets the algorithm should
3751                 * change.
3752                 */
3753
3754                peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3755                if (IS_ERR(peer_integrity_tfm)) {
3756                        peer_integrity_tfm = NULL;
3757                        drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3758                                 integrity_alg);
3759                        goto disconnect;
3760                }
3761
3762                hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3763                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3764                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3765                if (!(int_dig_in && int_dig_vv)) {
3766                        drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3767                        goto disconnect;
3768                }
3769        }
3770
3771        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3772        if (!new_net_conf) {
3773                drbd_err(connection, "Allocation of new net_conf failed\n");
3774                goto disconnect;
3775        }
3776
3777        mutex_lock(&connection->data.mutex);
3778        mutex_lock(&connection->resource->conf_update);
3779        old_net_conf = connection->net_conf;
3780        *new_net_conf = *old_net_conf;
3781
3782        new_net_conf->wire_protocol = p_proto;
3783        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3784        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3785        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3786        new_net_conf->two_primaries = p_two_primaries;
3787
3788        rcu_assign_pointer(connection->net_conf, new_net_conf);
3789        mutex_unlock(&connection->resource->conf_update);
3790        mutex_unlock(&connection->data.mutex);
3791
3792        crypto_free_shash(connection->peer_integrity_tfm);
3793        kfree(connection->int_dig_in);
3794        kfree(connection->int_dig_vv);
3795        connection->peer_integrity_tfm = peer_integrity_tfm;
3796        connection->int_dig_in = int_dig_in;
3797        connection->int_dig_vv = int_dig_vv;
3798
3799        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3800                drbd_info(connection, "peer data-integrity-alg: %s\n",
3801                          integrity_alg[0] ? integrity_alg : "(none)");
3802
3803        synchronize_rcu();
3804        kfree(old_net_conf);
3805        return 0;
3806
3807disconnect_rcu_unlock:
3808        rcu_read_unlock();
3809disconnect:
3810        crypto_free_shash(peer_integrity_tfm);
3811        kfree(int_dig_in);
3812        kfree(int_dig_vv);
3813        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3814        return -EIO;
3815}
3816
3817/* helper function
3818 * input: alg name, feature name
3819 * return: NULL (alg name was "")
3820 *         ERR_PTR(error) if something goes wrong
3821 *         or the crypto hash ptr, if it worked out ok. */
3822static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3823                const struct drbd_device *device,
3824                const char *alg, const char *name)
3825{
3826        struct crypto_shash *tfm;
3827
3828        if (!alg[0])
3829                return NULL;
3830
3831        tfm = crypto_alloc_shash(alg, 0, 0);
3832        if (IS_ERR(tfm)) {
3833                drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3834                        alg, name, PTR_ERR(tfm));
3835                return tfm;
3836        }
3837        return tfm;
3838}
3839
3840static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3841{
3842        void *buffer = connection->data.rbuf;
3843        int size = pi->size;
3844
3845        while (size) {
3846                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3847                s = drbd_recv(connection, buffer, s);
3848                if (s <= 0) {
3849                        if (s < 0)
3850                                return s;
3851                        break;
3852                }
3853                size -= s;
3854        }
3855        if (size)
3856                return -EIO;
3857        return 0;
3858}
3859
3860/*
3861 * config_unknown_volume  -  device configuration command for unknown volume
3862 *
3863 * When a device is added to an existing connection, the node on which the
3864 * device is added first will send configuration commands to its peer but the
3865 * peer will not know about the device yet.  It will warn and ignore these
3866 * commands.  Once the device is added on the second node, the second node will
3867 * send the same device configuration commands, but in the other direction.
3868 *
3869 * (We can also end up here if drbd is misconfigured.)
3870 */
3871static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3872{
3873        drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3874                  cmdname(pi->cmd), pi->vnr);
3875        return ignore_remaining_packet(connection, pi);
3876}
3877
3878static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3879{
3880        struct drbd_peer_device *peer_device;
3881        struct drbd_device *device;
3882        struct p_rs_param_95 *p;
3883        unsigned int header_size, data_size, exp_max_sz;
3884        struct crypto_shash *verify_tfm = NULL;
3885        struct crypto_shash *csums_tfm = NULL;
3886        struct net_conf *old_net_conf, *new_net_conf = NULL;
3887        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3888        const int apv = connection->agreed_pro_version;
3889        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3890        int fifo_size = 0;
3891        int err;
3892
3893        peer_device = conn_peer_device(connection, pi->vnr);
3894        if (!peer_device)
3895                return config_unknown_volume(connection, pi);
3896        device = peer_device->device;
3897
3898        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3899                    : apv == 88 ? sizeof(struct p_rs_param)
3900                                        + SHARED_SECRET_MAX
3901                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3902                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3903
3904        if (pi->size > exp_max_sz) {
3905                drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3906                    pi->size, exp_max_sz);
3907                return -EIO;
3908        }
3909
3910        if (apv <= 88) {
3911                header_size = sizeof(struct p_rs_param);
3912                data_size = pi->size - header_size;
3913        } else if (apv <= 94) {
3914                header_size = sizeof(struct p_rs_param_89);
3915                data_size = pi->size - header_size;
3916                D_ASSERT(device, data_size == 0);
3917        } else {
3918                header_size = sizeof(struct p_rs_param_95);
3919                data_size = pi->size - header_size;
3920                D_ASSERT(device, data_size == 0);
3921        }
3922
3923        /* initialize verify_alg and csums_alg */
3924        p = pi->data;
3925        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3926
3927        err = drbd_recv_all(peer_device->connection, p, header_size);
3928        if (err)
3929                return err;
3930
3931        mutex_lock(&connection->resource->conf_update);
3932        old_net_conf = peer_device->connection->net_conf;
3933        if (get_ldev(device)) {
3934                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3935                if (!new_disk_conf) {
3936                        put_ldev(device);
3937                        mutex_unlock(&connection->resource->conf_update);
3938                        drbd_err(device, "Allocation of new disk_conf failed\n");
3939                        return -ENOMEM;
3940                }
3941
3942                old_disk_conf = device->ldev->disk_conf;
3943                *new_disk_conf = *old_disk_conf;
3944
3945                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3946        }
3947
3948        if (apv >= 88) {
3949                if (apv == 88) {
3950                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3951                                drbd_err(device, "verify-alg of wrong size, "
3952                                        "peer wants %u, accepting only up to %u byte\n",
3953                                        data_size, SHARED_SECRET_MAX);
3954                                err = -EIO;
3955                                goto reconnect;
3956                        }
3957
3958                        err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3959                        if (err)
3960                                goto reconnect;
3961                        /* we expect NUL terminated string */
3962                        /* but just in case someone tries to be evil */
3963                        D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3964                        p->verify_alg[data_size-1] = 0;
3965
3966                } else /* apv >= 89 */ {
3967                        /* we still expect NUL terminated strings */
3968                        /* but just in case someone tries to be evil */
3969                        D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3970                        D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3971                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3972                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3973                }
3974
3975                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3976                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3977                                drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3978                                    old_net_conf->verify_alg, p->verify_alg);
3979                                goto disconnect;
3980                        }
3981                        verify_tfm = drbd_crypto_alloc_digest_safe(device,
3982                                        p->verify_alg, "verify-alg");
3983                        if (IS_ERR(verify_tfm)) {
3984                                verify_tfm = NULL;
3985                                goto disconnect;
3986                        }
3987                }
3988
3989                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3990                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3991                                drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3992                                    old_net_conf->csums_alg, p->csums_alg);
3993                                goto disconnect;
3994                        }
3995                        csums_tfm = drbd_crypto_alloc_digest_safe(device,
3996                                        p->csums_alg, "csums-alg");
3997                        if (IS_ERR(csums_tfm)) {
3998                                csums_tfm = NULL;
3999                                goto disconnect;
4000                        }
4001                }
4002
4003                if (apv > 94 && new_disk_conf) {
4004                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4005                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4006                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4007                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4008
4009                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4010                        if (fifo_size != device->rs_plan_s->size) {
4011                                new_plan = fifo_alloc(fifo_size);
4012                                if (!new_plan) {
4013                                        drbd_err(device, "kmalloc of fifo_buffer failed");
4014                                        put_ldev(device);
4015                                        goto disconnect;
4016                                }
4017                        }
4018                }
4019
4020                if (verify_tfm || csums_tfm) {
4021                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4022                        if (!new_net_conf) {
4023                                drbd_err(device, "Allocation of new net_conf failed\n");
4024                                goto disconnect;
4025                        }
4026
4027                        *new_net_conf = *old_net_conf;
4028
4029                        if (verify_tfm) {
4030                                strcpy(new_net_conf->verify_alg, p->verify_alg);
4031                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4032                                crypto_free_shash(peer_device->connection->verify_tfm);
4033                                peer_device->connection->verify_tfm = verify_tfm;
4034                                drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4035                        }
4036                        if (csums_tfm) {
4037                                strcpy(new_net_conf->csums_alg, p->csums_alg);
4038                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4039                                crypto_free_shash(peer_device->connection->csums_tfm);
4040                                peer_device->connection->csums_tfm = csums_tfm;
4041                                drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4042                        }
4043                        rcu_assign_pointer(connection->net_conf, new_net_conf);
4044                }
4045        }
4046
4047        if (new_disk_conf) {
4048                rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4049                put_ldev(device);
4050        }
4051
4052        if (new_plan) {
4053                old_plan = device->rs_plan_s;
4054                rcu_assign_pointer(device->rs_plan_s, new_plan);
4055        }
4056
4057        mutex_unlock(&connection->resource->conf_update);
4058        synchronize_rcu();
4059        if (new_net_conf)
4060                kfree(old_net_conf);
4061        kfree(old_disk_conf);
4062        kfree(old_plan);
4063
4064        return 0;
4065
4066reconnect:
4067        if (new_disk_conf) {
4068                put_ldev(device);
4069                kfree(new_disk_conf);
4070        }
4071        mutex_unlock(&connection->resource->conf_update);
4072        return -EIO;
4073
4074disconnect:
4075        kfree(new_plan);
4076        if (new_disk_conf) {
4077                put_ldev(device);
4078                kfree(new_disk_conf);
4079        }
4080        mutex_unlock(&connection->resource->conf_update);
4081        /* just for completeness: actually not needed,
4082         * as this is not reached if csums_tfm was ok. */
4083        crypto_free_shash(csums_tfm);
4084        /* but free the verify_tfm again, if csums_tfm did not work out */
4085        crypto_free_shash(verify_tfm);
4086        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4087        return -EIO;
4088}
4089
4090/* warn if the arguments differ by more than 12.5% */
4091static void warn_if_differ_considerably(struct drbd_device *device,
4092        const char *s, sector_t a, sector_t b)
4093{
4094        sector_t d;
4095        if (a == 0 || b == 0)
4096                return;
4097        d = (a > b) ? (a - b) : (b - a);
4098        if (d > (a>>3) || d > (b>>3))
4099                drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4100                     (unsigned long long)a, (unsigned long long)b);
4101}
4102
4103static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4104{
4105        struct drbd_peer_device *peer_device;
4106        struct drbd_device *device;
4107        struct p_sizes *p = pi->data;
4108        struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4109        enum determine_dev_size dd = DS_UNCHANGED;
4110        sector_t p_size, p_usize, p_csize, my_usize;
4111        sector_t new_size, cur_size;
4112        int ldsc = 0; /* local disk size changed */
4113        enum dds_flags ddsf;
4114
4115        peer_device = conn_peer_device(connection, pi->vnr);
4116        if (!peer_device)
4117                return config_unknown_volume(connection, pi);
4118        device = peer_device->device;
4119        cur_size = drbd_get_capacity(device->this_bdev);
4120
4121        p_size = be64_to_cpu(p->d_size);
4122        p_usize = be64_to_cpu(p->u_size);
4123        p_csize = be64_to_cpu(p->c_size);
4124
4125        /* just store the peer's disk size for now.
4126         * we still need to figure out whether we accept that. */
4127        device->p_size = p_size;
4128
4129        if (get_ldev(device)) {
4130                rcu_read_lock();
4131                my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4132                rcu_read_unlock();
4133
4134                warn_if_differ_considerably(device, "lower level device sizes",
4135                           p_size, drbd_get_max_capacity(device->ldev));
4136                warn_if_differ_considerably(device, "user requested size",
4137                                            p_usize, my_usize);
4138
4139                /* if this is the first connect, or an otherwise expected
4140                 * param exchange, choose the minimum */
4141                if (device->state.conn == C_WF_REPORT_PARAMS)
4142                        p_usize = min_not_zero(my_usize, p_usize);
4143
4144                /* Never shrink a device with usable data during connect,
4145                 * or "attach" on the peer.
4146                 * But allow online shrinking if we are connected. */
4147                new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4148                if (new_size < cur_size &&
4149                    device->state.disk >= D_OUTDATED &&
4150                    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4151                        drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4152                                        (unsigned long long)new_size, (unsigned long long)cur_size);
4153                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154                        put_ldev(device);
4155                        return -EIO;
4156                }
4157
4158                if (my_usize != p_usize) {
4159                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4160
4161                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4162                        if (!new_disk_conf) {
4163                                drbd_err(device, "Allocation of new disk_conf failed\n");
4164                                put_ldev(device);
4165                                return -ENOMEM;
4166                        }
4167
4168                        mutex_lock(&connection->resource->conf_update);
4169                        old_disk_conf = device->ldev->disk_conf;
4170                        *new_disk_conf = *old_disk_conf;
4171                        new_disk_conf->disk_size = p_usize;
4172
4173                        rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4174                        mutex_unlock(&connection->resource->conf_update);
4175                        synchronize_rcu();
4176                        kfree(old_disk_conf);
4177
4178                        drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4179                                 (unsigned long)p_usize, (unsigned long)my_usize);
4180                }
4181
4182                put_ldev(device);
4183        }
4184
4185        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4186        /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4187           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4188           drbd_reconsider_queue_parameters(), we can be sure that after
4189           drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4190
4191        ddsf = be16_to_cpu(p->dds_flags);
4192        if (get_ldev(device)) {
4193                drbd_reconsider_queue_parameters(device, device->ldev, o);
4194                dd = drbd_determine_dev_size(device, ddsf, NULL);
4195                put_ldev(device);
4196                if (dd == DS_ERROR)
4197                        return -EIO;
4198                drbd_md_sync(device);
4199        } else {
4200                /*
4201                 * I am diskless, need to accept the peer's *current* size.
4202                 * I must NOT accept the peers backing disk size,
4203                 * it may have been larger than mine all along...
4204                 *
4205                 * At this point, the peer knows more about my disk, or at
4206                 * least about what we last agreed upon, than myself.
4207                 * So if his c_size is less than his d_size, the most likely
4208                 * reason is that *my* d_size was smaller last time we checked.
4209                 *
4210                 * However, if he sends a zero current size,
4211                 * take his (user-capped or) backing disk size anyways.
4212                 *
4213                 * Unless of course he does not have a disk himself.
4214                 * In which case we ignore this completely.
4215                 */
4216                sector_t new_size = p_csize ?: p_usize ?: p_size;
4217                drbd_reconsider_queue_parameters(device, NULL, o);
4218                if (new_size == 0) {
4219                        /* Ignore, peer does not know nothing. */
4220                } else if (new_size == cur_size) {
4221                        /* nothing to do */
4222                } else if (cur_size != 0 && p_size == 0) {
4223                        drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4224                                        (unsigned long long)new_size, (unsigned long long)cur_size);
4225                } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4226                        drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4227                                        (unsigned long long)new_size, (unsigned long long)cur_size);
4228                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4229                        return -EIO;
4230                } else {
4231                        /* I believe the peer, if
4232                         *  - I don't have a current size myself
4233                         *  - we agree on the size anyways
4234                         *  - I do have a current size, am Secondary,
4235                         *    and he has the only disk
4236                         *  - I do have a current size, am Primary,
4237                         *    and he has the only disk,
4238                         *    which is larger than my current size
4239                         */
4240                        drbd_set_my_capacity(device, new_size);
4241                }
4242        }
4243
4244        if (get_ldev(device)) {
4245                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4246                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4247                        ldsc = 1;
4248                }
4249
4250                put_ldev(device);
4251        }
4252
4253        if (device->state.conn > C_WF_REPORT_PARAMS) {
4254                if (be64_to_cpu(p->c_size) !=
4255                    drbd_get_capacity(device->this_bdev) || ldsc) {
4256                        /* we have different sizes, probably peer
4257                         * needs to know my new size... */
4258                        drbd_send_sizes(peer_device, 0, ddsf);
4259                }
4260                if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4261                    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4262                        if (device->state.pdsk >= D_INCONSISTENT &&
4263                            device->state.disk >= D_INCONSISTENT) {
4264                                if (ddsf & DDSF_NO_RESYNC)
4265                                        drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4266                                else
4267                                        resync_after_online_grow(device);
4268                        } else
4269                                set_bit(RESYNC_AFTER_NEG, &device->flags);
4270                }
4271        }
4272
4273        return 0;
4274}
4275
4276static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4277{
4278        struct drbd_peer_device *peer_device;
4279        struct drbd_device *device;
4280        struct p_uuids *p = pi->data;
4281        u64 *p_uuid;
4282        int i, updated_uuids = 0;
4283
4284        peer_device = conn_peer_device(connection, pi->vnr);
4285        if (!peer_device)
4286                return config_unknown_volume(connection, pi);
4287        device = peer_device->device;
4288
4289        p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4290        if (!p_uuid) {
4291                drbd_err(device, "kmalloc of p_uuid failed\n");
4292                return false;
4293        }
4294
4295        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4296                p_uuid[i] = be64_to_cpu(p->uuid[i]);
4297
4298        kfree(device->p_uuid);
4299        device->p_uuid = p_uuid;
4300
4301        if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4302            device->state.disk < D_INCONSISTENT &&
4303            device->state.role == R_PRIMARY &&
4304            (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4305                drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4306                    (unsigned long long)device->ed_uuid);
4307                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4308                return -EIO;
4309        }
4310
4311        if (get_ldev(device)) {
4312                int skip_initial_sync =
4313                        device->state.conn == C_CONNECTED &&
4314                        peer_device->connection->agreed_pro_version >= 90 &&
4315                        device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4316                        (p_uuid[UI_FLAGS] & 8);
4317                if (skip_initial_sync) {
4318                        drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4319                        drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4320                                        "clear_n_write from receive_uuids",
4321                                        BM_LOCKED_TEST_ALLOWED);
4322                        _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4323                        _drbd_uuid_set(device, UI_BITMAP, 0);
4324                        _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4325                                        CS_VERBOSE, NULL);
4326                        drbd_md_sync(device);
4327                        updated_uuids = 1;
4328                }
4329                put_ldev(device);
4330        } else if (device->state.disk < D_INCONSISTENT &&
4331                   device->state.role == R_PRIMARY) {
4332                /* I am a diskless primary, the peer just created a new current UUID
4333                   for me. */
4334                updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4335        }
4336
4337        /* Before we test for the disk state, we should wait until an eventually
4338           ongoing cluster wide state change is finished. That is important if
4339           we are primary and are detaching from our disk. We need to see the
4340           new disk state... */
4341        mutex_lock(device->state_mutex);
4342        mutex_unlock(device->state_mutex);
4343        if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4344                updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4345
4346        if (updated_uuids)
4347                drbd_print_uuids(device, "receiver updated UUIDs to");
4348
4349        return 0;
4350}
4351
4352/**
4353 * convert_state() - Converts the peer's view of the cluster state to our point of view
4354 * @ps:         The state as seen by the peer.
4355 */
4356static union drbd_state convert_state(union drbd_state ps)
4357{
4358        union drbd_state ms;
4359
4360        static enum drbd_conns c_tab[] = {
4361                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4362                [C_CONNECTED] = C_CONNECTED,
4363
4364                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4365                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4366                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4367                [C_VERIFY_S]       = C_VERIFY_T,
4368                [C_MASK]   = C_MASK,
4369        };
4370
4371        ms.i = ps.i;
4372
4373        ms.conn = c_tab[ps.conn];
4374        ms.peer = ps.role;
4375        ms.role = ps.peer;
4376        ms.pdsk = ps.disk;
4377        ms.disk = ps.pdsk;
4378        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4379
4380        return ms;
4381}
4382
4383static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4384{
4385        struct drbd_peer_device *peer_device;
4386        struct drbd_device *device;
4387        struct p_req_state *p = pi->data;
4388        union drbd_state mask, val;
4389        enum drbd_state_rv rv;
4390
4391        peer_device = conn_peer_device(connection, pi->vnr);
4392        if (!peer_device)
4393                return -EIO;
4394        device = peer_device->device;
4395
4396        mask.i = be32_to_cpu(p->mask);
4397        val.i = be32_to_cpu(p->val);
4398
4399        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4400            mutex_is_locked(device->state_mutex)) {
4401                drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4402                return 0;
4403        }
4404
4405        mask = convert_state(mask);
4406        val = convert_state(val);
4407
4408        rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4409        drbd_send_sr_reply(peer_device, rv);
4410
4411        drbd_md_sync(device);
4412
4413        return 0;
4414}
4415
4416static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4417{
4418        struct p_req_state *p = pi->data;
4419        union drbd_state mask, val;
4420        enum drbd_state_rv rv;
4421
4422        mask.i = be32_to_cpu(p->mask);
4423        val.i = be32_to_cpu(p->val);
4424
4425        if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4426            mutex_is_locked(&connection->cstate_mutex)) {
4427                conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4428                return 0;
4429        }
4430
4431        mask = convert_state(mask);
4432        val = convert_state(val);
4433
4434        rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4435        conn_send_sr_reply(connection, rv);
4436
4437        return 0;
4438}
4439
4440static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4441{
4442        struct drbd_peer_device *peer_device;
4443        struct drbd_device *device;
4444        struct p_state *p = pi->data;
4445        union drbd_state os, ns, peer_state;
4446        enum drbd_disk_state real_peer_disk;
4447        enum chg_state_flags cs_flags;
4448        int rv;
4449
4450        peer_device = conn_peer_device(connection, pi->vnr);
4451        if (!peer_device)
4452                return config_unknown_volume(connection, pi);
4453        device = peer_device->device;
4454
4455        peer_state.i = be32_to_cpu(p->state);
4456
4457        real_peer_disk = peer_state.disk;
4458        if (peer_state.disk == D_NEGOTIATING) {
4459                real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4460                drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4461        }
4462
4463        spin_lock_irq(&device->resource->req_lock);
4464 retry:
4465        os = ns = drbd_read_state(device);
4466        spin_unlock_irq(&device->resource->req_lock);
4467
4468        /* If some other part of the code (ack_receiver thread, timeout)
4469         * already decided to close the connection again,
4470         * we must not "re-establish" it here. */
4471        if (os.conn <= C_TEAR_DOWN)
4472                return -ECONNRESET;
4473
4474        /* If this is the "end of sync" confirmation, usually the peer disk
4475         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4476         * set) resync started in PausedSyncT, or if the timing of pause-/
4477         * unpause-sync events has been "just right", the peer disk may
4478         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4479         */
4480        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4481            real_peer_disk == D_UP_TO_DATE &&
4482            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4483                /* If we are (becoming) SyncSource, but peer is still in sync
4484                 * preparation, ignore its uptodate-ness to avoid flapping, it
4485                 * will change to inconsistent once the peer reaches active
4486                 * syncing states.
4487                 * It may have changed syncer-paused flags, however, so we
4488                 * cannot ignore this completely. */
4489                if (peer_state.conn > C_CONNECTED &&
4490                    peer_state.conn < C_SYNC_SOURCE)
4491                        real_peer_disk = D_INCONSISTENT;
4492
4493                /* if peer_state changes to connected at the same time,
4494                 * it explicitly notifies us that it finished resync.
4495                 * Maybe we should finish it up, too? */
4496                else if (os.conn >= C_SYNC_SOURCE &&
4497                         peer_state.conn == C_CONNECTED) {
4498                        if (drbd_bm_total_weight(device) <= device->rs_failed)
4499                                drbd_resync_finished(device);
4500                        return 0;
4501                }
4502        }
4503
4504        /* explicit verify finished notification, stop sector reached. */
4505        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4506            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4507                ov_out_of_sync_print(device);
4508                drbd_resync_finished(device);
4509                return 0;
4510        }
4511
4512        /* peer says his disk is inconsistent, while we think it is uptodate,
4513         * and this happens while the peer still thinks we have a sync going on,
4514         * but we think we are already done with the sync.
4515         * We ignore this to avoid flapping pdsk.
4516         * This should not happen, if the peer is a recent version of drbd. */
4517        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4518            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4519                real_peer_disk = D_UP_TO_DATE;
4520
4521        if (ns.conn == C_WF_REPORT_PARAMS)
4522                ns.conn = C_CONNECTED;
4523
4524        if (peer_state.conn == C_AHEAD)
4525                ns.conn = C_BEHIND;
4526
4527        /* TODO:
4528         * if (primary and diskless and peer uuid != effective uuid)
4529         *     abort attach on peer;
4530         *
4531         * If this node does not have good data, was already connected, but
4532         * the peer did a late attach only now, trying to "negotiate" with me,
4533         * AND I am currently Primary, possibly frozen, with some specific
4534         * "effective" uuid, this should never be reached, really, because
4535         * we first send the uuids, then the current state.
4536         *
4537         * In this scenario, we already dropped the connection hard
4538         * when we received the unsuitable uuids (receive_uuids().
4539         *
4540         * Should we want to change this, that is: not drop the connection in
4541         * receive_uuids() already, then we would need to add a branch here
4542         * that aborts the attach of "unsuitable uuids" on the peer in case
4543         * this node is currently Diskless Primary.
4544         */
4545
4546        if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4547            get_ldev_if_state(device, D_NEGOTIATING)) {
4548                int cr; /* consider resync */
4549
4550                /* if we established a new connection */
4551                cr  = (os.conn < C_CONNECTED);
4552                /* if we had an established connection
4553                 * and one of the nodes newly attaches a disk */
4554                cr |= (os.conn == C_CONNECTED &&
4555                       (peer_state.disk == D_NEGOTIATING ||
4556                        os.disk == D_NEGOTIATING));
4557                /* if we have both been inconsistent, and the peer has been
4558                 * forced to be UpToDate with --force */
4559                cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4560                /* if we had been plain connected, and the admin requested to
4561                 * start a sync by "invalidate" or "invalidate-remote" */
4562                cr |= (os.conn == C_CONNECTED &&
4563                                (peer_state.conn >= C_STARTING_SYNC_S &&
4564                                 peer_state.conn <= C_WF_BITMAP_T));
4565
4566                if (cr)
4567                        ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4568
4569                put_ldev(device);
4570                if (ns.conn == C_MASK) {
4571                        ns.conn = C_CONNECTED;
4572                        if (device->state.disk == D_NEGOTIATING) {
4573                                drbd_force_state(device, NS(disk, D_FAILED));
4574                        } else if (peer_state.disk == D_NEGOTIATING) {
4575                                drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4576                                peer_state.disk = D_DISKLESS;
4577                                real_peer_disk = D_DISKLESS;
4578                        } else {
4579                                if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4580                                        return -EIO;
4581                                D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4582                                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4583                                return -EIO;
4584                        }
4585                }
4586        }
4587
4588        spin_lock_irq(&device->resource->req_lock);
4589        if (os.i != drbd_read_state(device).i)
4590                goto retry;
4591        clear_bit(CONSIDER_RESYNC, &device->flags);
4592        ns.peer = peer_state.role;
4593        ns.pdsk = real_peer_disk;
4594        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4595        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4596                ns.disk = device->new_state_tmp.disk;
4597        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4598        if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4599            test_bit(NEW_CUR_UUID, &device->flags)) {
4600                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4601                   for temporal network outages! */
4602                spin_unlock_irq(&device->resource->req_lock);
4603                drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4604                tl_clear(peer_device->connection);
4605                drbd_uuid_new_current(device);
4606                clear_bit(NEW_CUR_UUID, &device->flags);
4607                conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4608                return -EIO;
4609        }
4610        rv = _drbd_set_state(device, ns, cs_flags, NULL);
4611        ns = drbd_read_state(device);
4612        spin_unlock_irq(&device->resource->req_lock);
4613
4614        if (rv < SS_SUCCESS) {
4615                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4616                return -EIO;
4617        }
4618
4619        if (os.conn > C_WF_REPORT_PARAMS) {
4620                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4621                    peer_state.disk != D_NEGOTIATING ) {
4622                        /* we want resync, peer has not yet decided to sync... */
4623                        /* Nowadays only used when forcing a node into primary role and
4624                           setting its disk to UpToDate with that */
4625                        drbd_send_uuids(peer_device);
4626                        drbd_send_current_state(peer_device);
4627                }
4628        }
4629
4630        clear_bit(DISCARD_MY_DATA, &device->flags);
4631
4632        drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4633
4634        return 0;
4635}
4636
4637static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4638{
4639        struct drbd_peer_device *peer_device;
4640        struct drbd_device *device;
4641        struct p_rs_uuid *p = pi->data;
4642
4643        peer_device = conn_peer_device(connection, pi->vnr);
4644        if (!peer_device)
4645                return -EIO;
4646        device = peer_device->device;
4647
4648        wait_event(device->misc_wait,
4649                   device->state.conn == C_WF_SYNC_UUID ||
4650                   device->state.conn == C_BEHIND ||
4651                   device->state.conn < C_CONNECTED ||
4652                   device->state.disk < D_NEGOTIATING);
4653
4654        /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4655
4656        /* Here the _drbd_uuid_ functions are right, current should
4657           _not_ be rotated into the history */
4658        if (get_ldev_if_state(device, D_NEGOTIATING)) {
4659                _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4660                _drbd_uuid_set(device, UI_BITMAP, 0UL);
4661
4662                drbd_print_uuids(device, "updated sync uuid");
4663                drbd_start_resync(device, C_SYNC_TARGET);
4664
4665                put_ldev(device);
4666        } else
4667                drbd_err(device, "Ignoring SyncUUID packet!\n");
4668
4669        return 0;
4670}
4671
4672/**
4673 * receive_bitmap_plain
4674 *
4675 * Return 0 when done, 1 when another iteration is needed, and a negative error
4676 * code upon failure.
4677 */
4678static int
4679receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4680                     unsigned long *p, struct bm_xfer_ctx *c)
4681{
4682        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4683                                 drbd_header_size(peer_device->connection);
4684        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4685                                       c->bm_words - c->word_offset);
4686        unsigned int want = num_words * sizeof(*p);
4687        int err;
4688
4689        if (want != size) {
4690                drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4691                return -EIO;
4692        }
4693        if (want == 0)
4694                return 0;
4695        err = drbd_recv_all(peer_device->connection, p, want);
4696        if (err)
4697                return err;
4698
4699        drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4700
4701        c->word_offset += num_words;
4702        c->bit_offset = c->word_offset * BITS_PER_LONG;
4703        if (c->bit_offset > c->bm_bits)
4704                c->bit_offset = c->bm_bits;
4705
4706        return 1;
4707}
4708
4709static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4710{
4711        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4712}
4713
4714static int dcbp_get_start(struct p_compressed_bm *p)
4715{
4716        return (p->encoding & 0x80) != 0;
4717}
4718
4719static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4720{
4721        return (p->encoding >> 4) & 0x7;
4722}
4723
4724/**
4725 * recv_bm_rle_bits
4726 *
4727 * Return 0 when done, 1 when another iteration is needed, and a negative error
4728 * code upon failure.
4729 */
4730static int
4731recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4732                struct p_compressed_bm *p,
4733                 struct bm_xfer_ctx *c,
4734                 unsigned int len)
4735{
4736        struct bitstream bs;
4737        u64 look_ahead;
4738        u64 rl;
4739        u64 tmp;
4740        unsigned long s = c->bit_offset;
4741        unsigned long e;
4742        int toggle = dcbp_get_start(p);
4743        int have;
4744        int bits;
4745
4746        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4747
4748        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4749        if (bits < 0)
4750                return -EIO;
4751
4752        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4753                bits = vli_decode_bits(&rl, look_ahead);
4754                if (bits <= 0)
4755                        return -EIO;
4756
4757                if (toggle) {
4758                        e = s + rl -1;
4759                        if (e >= c->bm_bits) {
4760                                drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4761                                return -EIO;
4762                        }
4763                        _drbd_bm_set_bits(peer_device->device, s, e);
4764                }
4765
4766                if (have < bits) {
4767                        drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4768                                have, bits, look_ahead,
4769                                (unsigned int)(bs.cur.b - p->code),
4770                                (unsigned int)bs.buf_len);
4771                        return -EIO;
4772                }
4773                /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4774                if (likely(bits < 64))
4775                        look_ahead >>= bits;
4776                else
4777                        look_ahead = 0;
4778                have -= bits;
4779
4780                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4781                if (bits < 0)
4782                        return -EIO;
4783                look_ahead |= tmp << have;
4784                have += bits;
4785        }
4786
4787        c->bit_offset = s;
4788        bm_xfer_ctx_bit_to_word_offset(c);
4789
4790        return (s != c->bm_bits);
4791}
4792
4793/**
4794 * decode_bitmap_c
4795 *
4796 * Return 0 when done, 1 when another iteration is needed, and a negative error
4797 * code upon failure.
4798 */
4799static int
4800decode_bitmap_c(struct drbd_peer_device *peer_device,
4801                struct p_compressed_bm *p,
4802                struct bm_xfer_ctx *c,
4803                unsigned int len)
4804{
4805        if (dcbp_get_code(p) == RLE_VLI_Bits)
4806                return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4807
4808        /* other variants had been implemented for evaluation,
4809         * but have been dropped as this one turned out to be "best"
4810         * during all our tests. */
4811
4812        drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4813        conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4814        return -EIO;
4815}
4816
4817void INFO_bm_xfer_stats(struct drbd_device *device,
4818                const char *direction, struct bm_xfer_ctx *c)
4819{
4820        /* what would it take to transfer it "plaintext" */
4821        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4822        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4823        unsigned int plain =
4824                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4825                c->bm_words * sizeof(unsigned long);
4826        unsigned int total = c->bytes[0] + c->bytes[1];
4827        unsigned int r;
4828
4829        /* total can not be zero. but just in case: */
4830        if (total == 0)
4831                return;
4832
4833        /* don't report if not compressed */
4834        if (total >= plain)
4835                return;
4836
4837        /* total < plain. check for overflow, still */
4838        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4839                                    : (1000 * total / plain);
4840
4841        if (r > 1000)
4842                r = 1000;
4843
4844        r = 1000 - r;
4845        drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4846             "total %u; compression: %u.%u%%\n",
4847                        direction,
4848                        c->bytes[1], c->packets[1],
4849                        c->bytes[0], c->packets[0],
4850                        total, r/10, r % 10);
4851}
4852
4853/* Since we are processing the bitfield from lower addresses to higher,
4854   it does not matter if the process it in 32 bit chunks or 64 bit
4855   chunks as long as it is little endian. (Understand it as byte stream,
4856   beginning with the lowest byte...) If we would use big endian
4857   we would need to process it from the highest address to the lowest,
4858   in order to be agnostic to the 32 vs 64 bits issue.
4859
4860   returns 0 on failure, 1 if we successfully received it. */
4861static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4862{
4863        struct drbd_peer_device *peer_device;
4864        struct drbd_device *device;
4865        struct bm_xfer_ctx c;
4866        int err;
4867
4868        peer_device = conn_peer_device(connection, pi->vnr);
4869        if (!peer_device)
4870                return -EIO;
4871        device = peer_device->device;
4872
4873        drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4874        /* you are supposed to send additional out-of-sync information
4875         * if you actually set bits during this phase */
4876
4877        c = (struct bm_xfer_ctx) {
4878                .bm_bits = drbd_bm_bits(device),
4879                .bm_words = drbd_bm_words(device),
4880        };
4881
4882        for(;;) {
4883                if (pi->cmd == P_BITMAP)
4884                        err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4885                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4886                        /* MAYBE: sanity check that we speak proto >= 90,
4887                         * and the feature is enabled! */
4888                        struct p_compressed_bm *p = pi->data;
4889
4890                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4891                                drbd_err(device, "ReportCBitmap packet too large\n");
4892                                err = -EIO;
4893                                goto out;
4894                        }
4895                        if (pi->size <= sizeof(*p)) {
4896                                drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4897                                err = -EIO;
4898                                goto out;
4899                        }
4900                        err = drbd_recv_all(peer_device->connection, p, pi->size);
4901                        if (err)
4902                               goto out;
4903                        err = decode_bitmap_c(peer_device, p, &c, pi->size);
4904                } else {
4905                        drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4906                        err = -EIO;
4907                        goto out;
4908                }
4909
4910                c.packets[pi->cmd == P_BITMAP]++;
4911                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4912
4913                if (err <= 0) {
4914                        if (err < 0)
4915                                goto out;
4916                        break;
4917                }
4918                err = drbd_recv_header(peer_device->connection, pi);
4919                if (err)
4920                        goto out;
4921        }
4922
4923        INFO_bm_xfer_stats(device, "receive", &c);
4924
4925        if (device->state.conn == C_WF_BITMAP_T) {
4926                enum drbd_state_rv rv;
4927
4928                err = drbd_send_bitmap(device);
4929                if (err)
4930                        goto out;
4931                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4932                rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4933                D_ASSERT(device, rv == SS_SUCCESS);
4934        } else if (device->state.conn != C_WF_BITMAP_S) {
4935                /* admin may have requested C_DISCONNECTING,
4936                 * other threads may have noticed network errors */
4937                drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4938                    drbd_conn_str(device->state.conn));
4939        }
4940        err = 0;
4941
4942 out:
4943        drbd_bm_unlock(device);
4944        if (!err && device->state.conn == C_WF_BITMAP_S)
4945                drbd_start_resync(device, C_SYNC_SOURCE);
4946        return err;
4947}
4948
4949static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4950{
4951        drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4952                 pi->cmd, pi->size);
4953
4954        return ignore_remaining_packet(connection, pi);
4955}
4956
4957static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4958{
4959        /* Make sure we've acked all the TCP data associated
4960         * with the data requests being unplugged */
4961        drbd_tcp_quickack(connection->data.socket);
4962
4963        return 0;
4964}
4965
4966static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4967{
4968        struct drbd_peer_device *peer_device;
4969        struct drbd_device *device;
4970        struct p_block_desc *p = pi->data;
4971
4972        peer_device = conn_peer_device(connection, pi->vnr);
4973        if (!peer_device)
4974                return -EIO;
4975        device = peer_device->device;
4976
4977        switch (device->state.conn) {
4978        case C_WF_SYNC_UUID:
4979        case C_WF_BITMAP_T:
4980        case C_BEHIND:
4981                        break;
4982        default:
4983                drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4984                                drbd_conn_str(device->state.conn));
4985        }
4986
4987        drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4988
4989        return 0;
4990}
4991
4992static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4993{
4994        struct drbd_peer_device *peer_device;
4995        struct p_block_desc *p = pi->data;
4996        struct drbd_device *device;
4997        sector_t sector;
4998        int size, err = 0;
4999
5000        peer_device = conn_peer_device(connection, pi->vnr);
5001        if (!peer_device)
5002                return -EIO;
5003        device = peer_device->device;
5004
5005        sector = be64_to_cpu(p->sector);
5006        size = be32_to_cpu(p->blksize);
5007
5008        dec_rs_pending(device);
5009
5010        if (get_ldev(device)) {
5011                struct drbd_peer_request *peer_req;
5012                const int op = REQ_OP_WRITE_ZEROES;
5013
5014                peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5015                                               size, 0, GFP_NOIO);
5016                if (!peer_req) {
5017                        put_ldev(device);
5018                        return -ENOMEM;
5019                }
5020
5021                peer_req->w.cb = e_end_resync_block;
5022                peer_req->submit_jif = jiffies;
5023                peer_req->flags |= EE_TRIM;
5024
5025                spin_lock_irq(&device->resource->req_lock);
5026                list_add_tail(&peer_req->w.list, &device->sync_ee);
5027                spin_unlock_irq(&device->resource->req_lock);
5028
5029                atomic_add(pi->size >> 9, &device->rs_sect_ev);
5030                err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5031
5032                if (err) {
5033                        spin_lock_irq(&device->resource->req_lock);
5034                        list_del(&peer_req->w.list);
5035                        spin_unlock_irq(&device->resource->req_lock);
5036
5037                        drbd_free_peer_req(device, peer_req);
5038                        put_ldev(device);
5039                        err = 0;
5040                        goto fail;
5041                }
5042
5043                inc_unacked(device);
5044
5045                /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5046                   as well as drbd_rs_complete_io() */
5047        } else {
5048        fail:
5049                drbd_rs_complete_io(device, sector);
5050                drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5051        }
5052
5053        atomic_add(size >> 9, &device->rs_sect_in);
5054
5055        return err;
5056}
5057
5058struct data_cmd {
5059        int expect_payload;
5060        unsigned int pkt_size;
5061        int (*fn)(struct drbd_connection *, struct packet_info *);
5062};
5063
5064static struct data_cmd drbd_cmd_handler[] = {
5065        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
5066        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
5067        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5068        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5069        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
5070        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5071        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5072        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5073        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5074        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
5075        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5076        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5077        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
5078        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
5079        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
5080        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5081        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5082        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5083        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5084        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5085        [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5087        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5088        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5089        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5090        [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
5091        [P_ZEROES]          = { 0, sizeof(struct p_trim), receive_Data },
5092        [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5093        [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
5094};
5095
5096static void drbdd(struct drbd_connection *connection)
5097{
5098        struct packet_info pi;
5099        size_t shs; /* sub header size */
5100        int err;
5101
5102        while (get_t_state(&connection->receiver) == RUNNING) {
5103                struct data_cmd const *cmd;
5104
5105                drbd_thread_current_set_cpu(&connection->receiver);
5106                update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5107                if (drbd_recv_header_maybe_unplug(connection, &pi))
5108                        goto err_out;
5109
5110                cmd = &drbd_cmd_handler[pi.cmd];
5111                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5112                        drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5113                                 cmdname(pi.cmd), pi.cmd);
5114                        goto err_out;
5115                }
5116
5117                shs = cmd->pkt_size;
5118                if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5119                        shs += sizeof(struct o_qlim);
5120                if (pi.size > shs && !cmd->expect_payload) {
5121                        drbd_err(connection, "No payload expected %s l:%d\n",
5122                                 cmdname(pi.cmd), pi.size);
5123                        goto err_out;
5124                }
5125                if (pi.size < shs) {
5126                        drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5127                                 cmdname(pi.cmd), (int)shs, pi.size);
5128                        goto err_out;
5129                }
5130
5131                if (shs) {
5132                        update_receiver_timing_details(connection, drbd_recv_all_warn);
5133                        err = drbd_recv_all_warn(connection, pi.data, shs);
5134                        if (err)
5135                                goto err_out;
5136                        pi.size -= shs;
5137                }
5138
5139                update_receiver_timing_details(connection, cmd->fn);
5140                err = cmd->fn(connection, &pi);
5141                if (err) {
5142                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5143                                 cmdname(pi.cmd), err, pi.size);
5144                        goto err_out;
5145                }
5146        }
5147        return;
5148
5149    err_out:
5150        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5151}
5152
5153static void conn_disconnect(struct drbd_connection *connection)
5154{
5155        struct drbd_peer_device *peer_device;
5156        enum drbd_conns oc;
5157        int vnr;
5158
5159        if (connection->cstate == C_STANDALONE)
5160                return;
5161
5162        /* We are about to start the cleanup after connection loss.
5163         * Make sure drbd_make_request knows about that.
5164         * Usually we should be in some network failure state already,
5165         * but just in case we are not, we fix it up here.
5166         */
5167        conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5168
5169        /* ack_receiver does not clean up anything. it must not interfere, either */
5170        drbd_thread_stop(&connection->ack_receiver);
5171        if (connection->ack_sender) {
5172                destroy_workqueue(connection->ack_sender);
5173                connection->ack_sender = NULL;
5174        }
5175        drbd_free_sock(connection);
5176
5177        rcu_read_lock();
5178        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5179                struct drbd_device *device = peer_device->device;
5180                kref_get(&device->kref);
5181                rcu_read_unlock();
5182                drbd_disconnected(peer_device);
5183                kref_put(&device->kref, drbd_destroy_device);
5184                rcu_read_lock();
5185        }
5186        rcu_read_unlock();
5187
5188        if (!list_empty(&connection->current_epoch->list))
5189                drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5190        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5191        atomic_set(&connection->current_epoch->epoch_size, 0);
5192        connection->send.seen_any_write_yet = false;
5193
5194        drbd_info(connection, "Connection closed\n");
5195
5196        if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5197                conn_try_outdate_peer_async(connection);
5198
5199        spin_lock_irq(&connection->resource->req_lock);
5200        oc = connection->cstate;
5201        if (oc >= C_UNCONNECTED)
5202                _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5203
5204        spin_unlock_irq(&connection->resource->req_lock);
5205
5206        if (oc == C_DISCONNECTING)
5207                conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5208}
5209
5210static int drbd_disconnected(struct drbd_peer_device *peer_device)
5211{
5212        struct drbd_device *device = peer_device->device;
5213        unsigned int i;
5214
5215        /* wait for current activity to cease. */
5216        spin_lock_irq(&device->resource->req_lock);
5217        _drbd_wait_ee_list_empty(device, &device->active_ee);
5218        _drbd_wait_ee_list_empty(device, &device->sync_ee);
5219        _drbd_wait_ee_list_empty(device, &device->read_ee);
5220        spin_unlock_irq(&device->resource->req_lock);
5221
5222        /* We do not have data structures that would allow us to
5223         * get the rs_pending_cnt down to 0 again.
5224         *  * On C_SYNC_TARGET we do not have any data structures describing
5225         *    the pending RSDataRequest's we have sent.
5226         *  * On C_SYNC_SOURCE there is no data structure that tracks
5227         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5228         *  And no, it is not the sum of the reference counts in the
5229         *  resync_LRU. The resync_LRU tracks the whole operation including
5230         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5231         *  on the fly. */
5232        drbd_rs_cancel_all(device);
5233        device->rs_total = 0;
5234        device->rs_failed = 0;
5235        atomic_set(&device->rs_pending_cnt, 0);
5236        wake_up(&device->misc_wait);
5237
5238        del_timer_sync(&device->resync_timer);
5239        resync_timer_fn(&device->resync_timer);
5240
5241        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5242         * w_make_resync_request etc. which may still be on the worker queue
5243         * to be "canceled" */
5244        drbd_flush_workqueue(&peer_device->connection->sender_work);
5245
5246        drbd_finish_peer_reqs(device);
5247
5248        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5249           might have issued a work again. The one before drbd_finish_peer_reqs() is
5250           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5251        drbd_flush_workqueue(&peer_device->connection->sender_work);
5252
5253        /* need to do it again, drbd_finish_peer_reqs() may have populated it
5254         * again via drbd_try_clear_on_disk_bm(). */
5255        drbd_rs_cancel_all(device);
5256
5257        kfree(device->p_uuid);
5258        device->p_uuid = NULL;
5259
5260        if (!drbd_suspended(device))
5261                tl_clear(peer_device->connection);
5262
5263        drbd_md_sync(device);
5264
5265        if (get_ldev(device)) {
5266                drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5267                                "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5268                put_ldev(device);
5269        }
5270
5271        /* tcp_close and release of sendpage pages can be deferred.  I don't
5272         * want to use SO_LINGER, because apparently it can be deferred for
5273         * more than 20 seconds (longest time I checked).
5274         *
5275         * Actually we don't care for exactly when the network stack does its
5276         * put_page(), but release our reference on these pages right here.
5277         */
5278        i = drbd_free_peer_reqs(device, &device->net_ee);
5279        if (i)
5280                drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5281        i = atomic_read(&device->pp_in_use_by_net);
5282        if (i)
5283                drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5284        i = atomic_read(&device->pp_in_use);
5285        if (i)
5286                drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5287
5288        D_ASSERT(device, list_empty(&device->read_ee));
5289        D_ASSERT(device, list_empty(&device->active_ee));
5290        D_ASSERT(device, list_empty(&device->sync_ee));
5291        D_ASSERT(device, list_empty(&device->done_ee));
5292
5293        return 0;
5294}
5295
5296/*
5297 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5298 * we can agree on is stored in agreed_pro_version.
5299 *
5300 * feature flags and the reserved array should be enough room for future
5301 * enhancements of the handshake protocol, and possible plugins...
5302 *
5303 * for now, they are expected to be zero, but ignored.
5304 */
5305static int drbd_send_features(struct drbd_connection *connection)
5306{
5307        struct drbd_socket *sock;
5308        struct p_connection_features *p;
5309
5310        sock = &connection->data;
5311        p = conn_prepare_command(connection, sock);
5312        if (!p)
5313                return -EIO;
5314        memset(p, 0, sizeof(*p));
5315        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5316        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5317        p->feature_flags = cpu_to_be32(PRO_FEATURES);
5318        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5319}
5320
5321/*
5322 * return values:
5323 *   1 yes, we have a valid connection
5324 *   0 oops, did not work out, please try again
5325 *  -1 peer talks different language,
5326 *     no point in trying again, please go standalone.
5327 */
5328static int drbd_do_features(struct drbd_connection *connection)
5329{
5330        /* ASSERT current == connection->receiver ... */
5331        struct p_connection_features *p;
5332        const int expect = sizeof(struct p_connection_features);
5333        struct packet_info pi;
5334        int err;
5335
5336        err = drbd_send_features(connection);
5337        if (err)
5338                return 0;
5339
5340        err = drbd_recv_header(connection, &pi);
5341        if (err)
5342                return 0;
5343
5344        if (pi.cmd != P_CONNECTION_FEATURES) {
5345                drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5346                         cmdname(pi.cmd), pi.cmd);
5347                return -1;
5348        }
5349
5350        if (pi.size != expect) {
5351                drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5352                     expect, pi.size);
5353                return -1;
5354        }
5355
5356        p = pi.data;
5357        err = drbd_recv_all_warn(connection, p, expect);
5358        if (err)
5359                return 0;
5360
5361        p->protocol_min = be32_to_cpu(p->protocol_min);
5362        p->protocol_max = be32_to_cpu(p->protocol_max);
5363        if (p->protocol_max == 0)
5364                p->protocol_max = p->protocol_min;
5365
5366        if (PRO_VERSION_MAX < p->protocol_min ||
5367            PRO_VERSION_MIN > p->protocol_max)
5368                goto incompat;
5369
5370        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5371        connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5372
5373        drbd_info(connection, "Handshake successful: "
5374             "Agreed network protocol version %d\n", connection->agreed_pro_version);
5375
5376        drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5377                  connection->agreed_features,
5378                  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5379                  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5380                  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5381                  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5382                  connection->agreed_features ? "" : " none");
5383
5384        return 1;
5385
5386 incompat:
5387        drbd_err(connection, "incompatible DRBD dialects: "
5388            "I support %d-%d, peer supports %d-%d\n",
5389            PRO_VERSION_MIN, PRO_VERSION_MAX,
5390            p->protocol_min, p->protocol_max);
5391        return -1;
5392}
5393
5394#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5395static int drbd_do_auth(struct drbd_connection *connection)
5396{
5397        drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5398        drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5399        return -1;
5400}
5401#else
5402#define CHALLENGE_LEN 64
5403
5404/* Return value:
5405        1 - auth succeeded,
5406        0 - failed, try again (network error),
5407        -1 - auth failed, don't try again.
5408*/
5409
5410static int drbd_do_auth(struct drbd_connection *connection)
5411{
5412        struct drbd_socket *sock;
5413        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5414        char *response = NULL;
5415        char *right_response = NULL;
5416        char *peers_ch = NULL;
5417        unsigned int key_len;
5418        char secret[SHARED_SECRET_MAX]; /* 64 byte */
5419        unsigned int resp_size;
5420        struct shash_desc *desc;
5421        struct packet_info pi;
5422        struct net_conf *nc;
5423        int err, rv;
5424
5425        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5426
5427        rcu_read_lock();
5428        nc = rcu_dereference(connection->net_conf);
5429        key_len = strlen(nc->shared_secret);
5430        memcpy(secret, nc->shared_secret, key_len);
5431        rcu_read_unlock();
5432
5433        desc = kmalloc(sizeof(struct shash_desc) +
5434                       crypto_shash_descsize(connection->cram_hmac_tfm),
5435                       GFP_KERNEL);
5436        if (!desc) {
5437                rv = -1;
5438                goto fail;
5439        }
5440        desc->tfm = connection->cram_hmac_tfm;
5441
5442        rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5443        if (rv) {
5444                drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5445                rv = -1;
5446                goto fail;
5447        }
5448
5449        get_random_bytes(my_challenge, CHALLENGE_LEN);
5450
5451        sock = &connection->data;
5452        if (!conn_prepare_command(connection, sock)) {
5453                rv = 0;
5454                goto fail;
5455        }
5456        rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5457                                my_challenge, CHALLENGE_LEN);
5458        if (!rv)
5459                goto fail;
5460
5461        err = drbd_recv_header(connection, &pi);
5462        if (err) {
5463                rv = 0;
5464                goto fail;
5465        }
5466
5467        if (pi.cmd != P_AUTH_CHALLENGE) {
5468                drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5469                         cmdname(pi.cmd), pi.cmd);
5470                rv = -1;
5471                goto fail;
5472        }
5473
5474        if (pi.size > CHALLENGE_LEN * 2) {
5475                drbd_err(connection, "expected AuthChallenge payload too big.\n");
5476                rv = -1;
5477                goto fail;
5478        }
5479
5480        if (pi.size < CHALLENGE_LEN) {
5481                drbd_err(connection, "AuthChallenge payload too small.\n");
5482                rv = -1;
5483                goto fail;
5484        }
5485
5486        peers_ch = kmalloc(pi.size, GFP_NOIO);
5487        if (peers_ch == NULL) {
5488                drbd_err(connection, "kmalloc of peers_ch failed\n");
5489                rv = -1;
5490                goto fail;
5491        }
5492
5493        err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5494        if (err) {
5495                rv = 0;
5496                goto fail;
5497        }
5498
5499        if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5500                drbd_err(connection, "Peer presented the same challenge!\n");
5501                rv = -1;
5502                goto fail;
5503        }
5504
5505        resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5506        response = kmalloc(resp_size, GFP_NOIO);
5507        if (response == NULL) {
5508                drbd_err(connection, "kmalloc of response failed\n");
5509                rv = -1;
5510                goto fail;
5511        }
5512
5513        rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5514        if (rv) {
5515                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5516                rv = -1;
5517                goto fail;
5518        }
5519
5520        if (!conn_prepare_command(connection, sock)) {
5521                rv = 0;
5522                goto fail;
5523        }
5524        rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5525                                response, resp_size);
5526        if (!rv)
5527                goto fail;
5528
5529        err = drbd_recv_header(connection, &pi);
5530        if (err) {
5531                rv = 0;
5532                goto fail;
5533        }
5534
5535        if (pi.cmd != P_AUTH_RESPONSE) {
5536                drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5537                         cmdname(pi.cmd), pi.cmd);
5538                rv = 0;
5539                goto fail;
5540        }
5541
5542        if (pi.size != resp_size) {
5543                drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5544                rv = 0;
5545                goto fail;
5546        }
5547
5548        err = drbd_recv_all_warn(connection, response , resp_size);
5549        if (err) {
5550                rv = 0;
5551                goto fail;
5552        }
5553
5554        right_response = kmalloc(resp_size, GFP_NOIO);
5555        if (right_response == NULL) {
5556                drbd_err(connection, "kmalloc of right_response failed\n");
5557                rv = -1;
5558                goto fail;
5559        }
5560
5561        rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5562                                 right_response);
5563        if (rv) {
5564                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5565                rv = -1;
5566                goto fail;
5567        }
5568
5569        rv = !memcmp(response, right_response, resp_size);
5570
5571        if (rv)
5572                drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5573                     resp_size);
5574        else
5575                rv = -1;
5576
5577 fail:
5578        kfree(peers_ch);
5579        kfree(response);
5580        kfree(right_response);
5581        if (desc) {
5582                shash_desc_zero(desc);
5583                kfree(desc);
5584        }
5585
5586        return rv;
5587}
5588#endif
5589
5590int drbd_receiver(struct drbd_thread *thi)
5591{
5592        struct drbd_connection *connection = thi->connection;
5593        int h;
5594
5595        drbd_info(connection, "receiver (re)started\n");
5596
5597        do {
5598                h = conn_connect(connection);
5599                if (h == 0) {
5600                        conn_disconnect(connection);
5601                        schedule_timeout_interruptible(HZ);
5602                }
5603                if (h == -1) {
5604                        drbd_warn(connection, "Discarding network configuration.\n");
5605                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5606                }
5607        } while (h == 0);
5608
5609        if (h > 0) {
5610                blk_start_plug(&connection->receiver_plug);
5611                drbdd(connection);
5612                blk_finish_plug(&connection->receiver_plug);
5613        }
5614
5615        conn_disconnect(connection);
5616
5617        drbd_info(connection, "receiver terminated\n");
5618        return 0;
5619}
5620
5621/* ********* acknowledge sender ******** */
5622
5623static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5624{
5625        struct p_req_state_reply *p = pi->data;
5626        int retcode = be32_to_cpu(p->retcode);
5627
5628        if (retcode >= SS_SUCCESS) {
5629                set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5630        } else {
5631                set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5632                drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5633                         drbd_set_st_err_str(retcode), retcode);
5634        }
5635        wake_up(&connection->ping_wait);
5636
5637        return 0;
5638}
5639
5640static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5641{
5642        struct drbd_peer_device *peer_device;
5643        struct drbd_device *device;
5644        struct p_req_state_reply *p = pi->data;
5645        int retcode = be32_to_cpu(p->retcode);
5646
5647        peer_device = conn_peer_device(connection, pi->vnr);
5648        if (!peer_device)
5649                return -EIO;
5650        device = peer_device->device;
5651
5652        if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5653                D_ASSERT(device, connection->agreed_pro_version < 100);
5654                return got_conn_RqSReply(connection, pi);
5655        }
5656
5657        if (retcode >= SS_SUCCESS) {
5658                set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5659        } else {
5660                set_bit(CL_ST_CHG_FAIL, &device->flags);
5661                drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5662                        drbd_set_st_err_str(retcode), retcode);
5663        }
5664        wake_up(&device->state_wait);
5665
5666        return 0;
5667}
5668
5669static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5670{
5671        return drbd_send_ping_ack(connection);
5672
5673}
5674
5675static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5676{
5677        /* restore idle timeout */
5678        connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5679        if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5680                wake_up(&connection->ping_wait);
5681
5682        return 0;
5683}
5684
5685static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5686{
5687        struct drbd_peer_device *peer_device;
5688        struct drbd_device *device;
5689        struct p_block_ack *p = pi->data;
5690        sector_t sector = be64_to_cpu(p->sector);
5691        int blksize = be32_to_cpu(p->blksize);
5692
5693        peer_device = conn_peer_device(connection, pi->vnr);
5694        if (!peer_device)
5695                return -EIO;
5696        device = peer_device->device;
5697
5698        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5699
5700        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5701
5702        if (get_ldev(device)) {
5703                drbd_rs_complete_io(device, sector);
5704                drbd_set_in_sync(device, sector, blksize);
5705                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5706                device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5707                put_ldev(device);
5708        }
5709        dec_rs_pending(device);
5710        atomic_add(blksize >> 9, &device->rs_sect_in);
5711
5712        return 0;
5713}
5714
5715static int
5716validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5717                              struct rb_root *root, const char *func,
5718                              enum drbd_req_event what, bool missing_ok)
5719{
5720        struct drbd_request *req;
5721        struct bio_and_error m;
5722
5723        spin_lock_irq(&device->resource->req_lock);
5724        req = find_request(device, root, id, sector, missing_ok, func);
5725        if (unlikely(!req)) {
5726                spin_unlock_irq(&device->resource->req_lock);
5727                return -EIO;
5728        }
5729        __req_mod(req, what, &m);
5730        spin_unlock_irq(&device->resource->req_lock);
5731
5732        if (m.bio)
5733                complete_master_bio(device, &m);
5734        return 0;
5735}
5736
5737static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5738{
5739        struct drbd_peer_device *peer_device;
5740        struct drbd_device *device;
5741        struct p_block_ack *p = pi->data;
5742        sector_t sector = be64_to_cpu(p->sector);
5743        int blksize = be32_to_cpu(p->blksize);
5744        enum drbd_req_event what;
5745
5746        peer_device = conn_peer_device(connection, pi->vnr);
5747        if (!peer_device)
5748                return -EIO;
5749        device = peer_device->device;
5750
5751        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5752
5753        if (p->block_id == ID_SYNCER) {
5754                drbd_set_in_sync(device, sector, blksize);
5755                dec_rs_pending(device);
5756                return 0;
5757        }
5758        switch (pi->cmd) {
5759        case P_RS_WRITE_ACK:
5760                what = WRITE_ACKED_BY_PEER_AND_SIS;
5761                break;
5762        case P_WRITE_ACK:
5763                what = WRITE_ACKED_BY_PEER;
5764                break;
5765        case P_RECV_ACK:
5766                what = RECV_ACKED_BY_PEER;
5767                break;
5768        case P_SUPERSEDED:
5769                what = CONFLICT_RESOLVED;
5770                break;
5771        case P_RETRY_WRITE:
5772                what = POSTPONE_WRITE;
5773                break;
5774        default:
5775                BUG();
5776        }
5777
5778        return validate_req_change_req_state(device, p->block_id, sector,
5779                                             &device->write_requests, __func__,
5780                                             what, false);
5781}
5782
5783static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5784{
5785        struct drbd_peer_device *peer_device;
5786        struct drbd_device *device;
5787        struct p_block_ack *p = pi->data;
5788        sector_t sector = be64_to_cpu(p->sector);
5789        int size = be32_to_cpu(p->blksize);
5790        int err;
5791
5792        peer_device = conn_peer_device(connection, pi->vnr);
5793        if (!peer_device)
5794                return -EIO;
5795        device = peer_device->device;
5796
5797        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5798
5799        if (p->block_id == ID_SYNCER) {
5800                dec_rs_pending(device);
5801                drbd_rs_failed_io(device, sector, size);
5802                return 0;
5803        }
5804
5805        err = validate_req_change_req_state(device, p->block_id, sector,
5806                                            &device->write_requests, __func__,
5807                                            NEG_ACKED, true);
5808        if (err) {
5809                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5810                   The master bio might already be completed, therefore the
5811                   request is no longer in the collision hash. */
5812                /* In Protocol B we might already have got a P_RECV_ACK
5813                   but then get a P_NEG_ACK afterwards. */
5814                drbd_set_out_of_sync(device, sector, size);
5815        }
5816        return 0;
5817}
5818
5819static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5820{
5821        struct drbd_peer_device *peer_device;
5822        struct drbd_device *device;
5823        struct p_block_ack *p = pi->data;
5824        sector_t sector = be64_to_cpu(p->sector);
5825
5826        peer_device = conn_peer_device(connection, pi->vnr);
5827        if (!peer_device)
5828                return -EIO;
5829        device = peer_device->device;
5830
5831        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5832
5833        drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5834            (unsigned long long)sector, be32_to_cpu(p->blksize));
5835
5836        return validate_req_change_req_state(device, p->block_id, sector,
5837                                             &device->read_requests, __func__,
5838                                             NEG_ACKED, false);
5839}
5840
5841static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5842{
5843        struct drbd_peer_device *peer_device;
5844        struct drbd_device *device;
5845        sector_t sector;
5846        int size;
5847        struct p_block_ack *p = pi->data;
5848
5849        peer_device = conn_peer_device(connection, pi->vnr);
5850        if (!peer_device)
5851                return -EIO;
5852        device = peer_device->device;
5853
5854        sector = be64_to_cpu(p->sector);
5855        size = be32_to_cpu(p->blksize);
5856
5857        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5858
5859        dec_rs_pending(device);
5860
5861        if (get_ldev_if_state(device, D_FAILED)) {
5862                drbd_rs_complete_io(device, sector);
5863                switch (pi->cmd) {
5864                case P_NEG_RS_DREPLY:
5865                        drbd_rs_failed_io(device, sector, size);
5866                case P_RS_CANCEL:
5867                        break;
5868                default:
5869                        BUG();
5870                }
5871                put_ldev(device);
5872        }
5873
5874        return 0;
5875}
5876
5877static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5878{
5879        struct p_barrier_ack *p = pi->data;
5880        struct drbd_peer_device *peer_device;
5881        int vnr;
5882
5883        tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5884
5885        rcu_read_lock();
5886        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5887                struct drbd_device *device = peer_device->device;
5888
5889                if (device->state.conn == C_AHEAD &&
5890                    atomic_read(&device->ap_in_flight) == 0 &&
5891                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5892                        device->start_resync_timer.expires = jiffies + HZ;
5893                        add_timer(&device->start_resync_timer);
5894                }
5895        }
5896        rcu_read_unlock();
5897
5898        return 0;
5899}
5900
5901static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5902{
5903        struct drbd_peer_device *peer_device;
5904        struct drbd_device *device;
5905        struct p_block_ack *p = pi->data;
5906        struct drbd_device_work *dw;
5907        sector_t sector;
5908        int size;
5909
5910        peer_device = conn_peer_device(connection, pi->vnr);
5911        if (!peer_device)
5912                return -EIO;
5913        device = peer_device->device;
5914
5915        sector = be64_to_cpu(p->sector);
5916        size = be32_to_cpu(p->blksize);
5917
5918        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5919
5920        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5921                drbd_ov_out_of_sync_found(device, sector, size);
5922        else
5923                ov_out_of_sync_print(device);
5924
5925        if (!get_ldev(device))
5926                return 0;
5927
5928        drbd_rs_complete_io(device, sector);
5929        dec_rs_pending(device);
5930
5931        --device->ov_left;
5932
5933        /* let's advance progress step marks only for every other megabyte */
5934        if ((device->ov_left & 0x200) == 0x200)
5935                drbd_advance_rs_marks(device, device->ov_left);
5936
5937        if (device->ov_left == 0) {
5938                dw = kmalloc(sizeof(*dw), GFP_NOIO);
5939                if (dw) {
5940                        dw->w.cb = w_ov_finished;
5941                        dw->device = device;
5942                        drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5943                } else {
5944                        drbd_err(device, "kmalloc(dw) failed.");
5945                        ov_out_of_sync_print(device);
5946                        drbd_resync_finished(device);
5947                }
5948        }
5949        put_ldev(device);
5950        return 0;
5951}
5952
5953static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5954{
5955        return 0;
5956}
5957
5958struct meta_sock_cmd {
5959        size_t pkt_size;
5960        int (*fn)(struct drbd_connection *connection, struct packet_info *);
5961};
5962
5963static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5964{
5965        long t;
5966        struct net_conf *nc;
5967
5968        rcu_read_lock();
5969        nc = rcu_dereference(connection->net_conf);
5970        t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5971        rcu_read_unlock();
5972
5973        t *= HZ;
5974        if (ping_timeout)
5975                t /= 10;
5976
5977        connection->meta.socket->sk->sk_rcvtimeo = t;
5978}
5979
5980static void set_ping_timeout(struct drbd_connection *connection)
5981{
5982        set_rcvtimeo(connection, 1);
5983}
5984
5985static void set_idle_timeout(struct drbd_connection *connection)
5986{
5987        set_rcvtimeo(connection, 0);
5988}
5989
5990static struct meta_sock_cmd ack_receiver_tbl[] = {
5991        [P_PING]            = { 0, got_Ping },
5992        [P_PING_ACK]        = { 0, got_PingAck },
5993        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5994        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5995        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5996        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5997        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5998        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5999        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6000        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
6001        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
6002        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6003        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6004        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6005        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6006        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6007        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
6008};
6009
6010int drbd_ack_receiver(struct drbd_thread *thi)
6011{
6012        struct drbd_connection *connection = thi->connection;
6013        struct meta_sock_cmd *cmd = NULL;
6014        struct packet_info pi;
6015        unsigned long pre_recv_jif;
6016        int rv;
6017        void *buf    = connection->meta.rbuf;
6018        int received = 0;
6019        unsigned int header_size = drbd_header_size(connection);
6020        int expect   = header_size;
6021        bool ping_timeout_active = false;
6022        struct sched_param param = { .sched_priority = 2 };
6023
6024        rv = sched_setscheduler(current, SCHED_RR, &param);
6025        if (rv < 0)
6026                drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6027
6028        while (get_t_state(thi) == RUNNING) {
6029                drbd_thread_current_set_cpu(thi);
6030
6031                conn_reclaim_net_peer_reqs(connection);
6032
6033                if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6034                        if (drbd_send_ping(connection)) {
6035                                drbd_err(connection, "drbd_send_ping has failed\n");
6036                                goto reconnect;
6037                        }
6038                        set_ping_timeout(connection);
6039                        ping_timeout_active = true;
6040                }
6041
6042                pre_recv_jif = jiffies;
6043                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6044
6045                /* Note:
6046                 * -EINTR        (on meta) we got a signal
6047                 * -EAGAIN       (on meta) rcvtimeo expired
6048                 * -ECONNRESET   other side closed the connection
6049                 * -ERESTARTSYS  (on data) we got a signal
6050                 * rv <  0       other than above: unexpected error!
6051                 * rv == expected: full header or command
6052                 * rv <  expected: "woken" by signal during receive
6053                 * rv == 0       : "connection shut down by peer"
6054                 */
6055                if (likely(rv > 0)) {
6056                        received += rv;
6057                        buf      += rv;
6058                } else if (rv == 0) {
6059                        if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6060                                long t;
6061                                rcu_read_lock();
6062                                t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6063                                rcu_read_unlock();
6064
6065                                t = wait_event_timeout(connection->ping_wait,
6066                                                       connection->cstate < C_WF_REPORT_PARAMS,
6067                                                       t);
6068                                if (t)
6069                                        break;
6070                        }
6071                        drbd_err(connection, "meta connection shut down by peer.\n");
6072                        goto reconnect;
6073                } else if (rv == -EAGAIN) {
6074                        /* If the data socket received something meanwhile,
6075                         * that is good enough: peer is still alive. */
6076                        if (time_after(connection->last_received, pre_recv_jif))
6077                                continue;
6078                        if (ping_timeout_active) {
6079                                drbd_err(connection, "PingAck did not arrive in time.\n");
6080                                goto reconnect;
6081                        }
6082                        set_bit(SEND_PING, &connection->flags);
6083                        continue;
6084                } else if (rv == -EINTR) {
6085                        /* maybe drbd_thread_stop(): the while condition will notice.
6086                         * maybe woken for send_ping: we'll send a ping above,
6087                         * and change the rcvtimeo */
6088                        flush_signals(current);
6089                        continue;
6090                } else {
6091                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6092                        goto reconnect;
6093                }
6094
6095                if (received == expect && cmd == NULL) {
6096                        if (decode_header(connection, connection->meta.rbuf, &pi))
6097                                goto reconnect;
6098                        cmd = &ack_receiver_tbl[pi.cmd];
6099                        if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6100                                drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6101                                         cmdname(pi.cmd), pi.cmd);
6102                                goto disconnect;
6103                        }
6104                        expect = header_size + cmd->pkt_size;
6105                        if (pi.size != expect - header_size) {
6106                                drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6107                                        pi.cmd, pi.size);
6108                                goto reconnect;
6109                        }
6110                }
6111                if (received == expect) {
6112                        bool err;
6113
6114                        err = cmd->fn(connection, &pi);
6115                        if (err) {
6116                                drbd_err(connection, "%ps failed\n", cmd->fn);
6117                                goto reconnect;
6118                        }
6119
6120                        connection->last_received = jiffies;
6121
6122                        if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6123                                set_idle_timeout(connection);
6124                                ping_timeout_active = false;
6125                        }
6126
6127                        buf      = connection->meta.rbuf;
6128                        received = 0;
6129                        expect   = header_size;
6130                        cmd      = NULL;
6131                }
6132        }
6133
6134        if (0) {
6135reconnect:
6136                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6137                conn_md_sync(connection);
6138        }
6139        if (0) {
6140disconnect:
6141                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6142        }
6143
6144        drbd_info(connection, "ack_receiver terminated\n");
6145
6146        return 0;
6147}
6148
6149void drbd_send_acks_wf(struct work_struct *ws)
6150{
6151        struct drbd_peer_device *peer_device =
6152                container_of(ws, struct drbd_peer_device, send_acks_work);
6153        struct drbd_connection *connection = peer_device->connection;
6154        struct drbd_device *device = peer_device->device;
6155        struct net_conf *nc;
6156        int tcp_cork, err;
6157
6158        rcu_read_lock();
6159        nc = rcu_dereference(connection->net_conf);
6160        tcp_cork = nc->tcp_cork;
6161        rcu_read_unlock();
6162
6163        if (tcp_cork)
6164                drbd_tcp_cork(connection->meta.socket);
6165
6166        err = drbd_finish_peer_reqs(device);
6167        kref_put(&device->kref, drbd_destroy_device);
6168        /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6169           struct work_struct send_acks_work alive, which is in the peer_device object */
6170
6171        if (err) {
6172                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6173                return;
6174        }
6175
6176        if (tcp_cork)
6177                drbd_tcp_uncork(connection->meta.socket);
6178
6179        return;
6180}
6181