linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <asm/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_protocol.h"
  48#include "drbd_req.h"
  49#include "drbd_vli.h"
  50
  51#define PRO_FEATURES (FF_TRIM)
  52
  53struct packet_info {
  54        enum drbd_packet cmd;
  55        unsigned int size;
  56        unsigned int vnr;
  57        void *data;
  58};
  59
  60enum finish_epoch {
  61        FE_STILL_LIVE,
  62        FE_DESTROYED,
  63        FE_RECYCLED,
  64};
  65
  66static int drbd_do_features(struct drbd_connection *connection);
  67static int drbd_do_auth(struct drbd_connection *connection);
  68static int drbd_disconnected(struct drbd_peer_device *);
  69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
  70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  71static int e_end_block(struct drbd_work *, int);
  72
  73
  74#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  75
  76/*
  77 * some helper functions to deal with single linked page lists,
  78 * page->private being our "next" pointer.
  79 */
  80
  81/* If at least n pages are linked at head, get n pages off.
  82 * Otherwise, don't modify head, and return NULL.
  83 * Locking is the responsibility of the caller.
  84 */
  85static struct page *page_chain_del(struct page **head, int n)
  86{
  87        struct page *page;
  88        struct page *tmp;
  89
  90        BUG_ON(!n);
  91        BUG_ON(!head);
  92
  93        page = *head;
  94
  95        if (!page)
  96                return NULL;
  97
  98        while (page) {
  99                tmp = page_chain_next(page);
 100                if (--n == 0)
 101                        break; /* found sufficient pages */
 102                if (tmp == NULL)
 103                        /* insufficient pages, don't use any of them. */
 104                        return NULL;
 105                page = tmp;
 106        }
 107
 108        /* add end of list marker for the returned list */
 109        set_page_private(page, 0);
 110        /* actual return value, and adjustment of head */
 111        page = *head;
 112        *head = tmp;
 113        return page;
 114}
 115
 116/* may be used outside of locks to find the tail of a (usually short)
 117 * "private" page chain, before adding it back to a global chain head
 118 * with page_chain_add() under a spinlock. */
 119static struct page *page_chain_tail(struct page *page, int *len)
 120{
 121        struct page *tmp;
 122        int i = 1;
 123        while ((tmp = page_chain_next(page)))
 124                ++i, page = tmp;
 125        if (len)
 126                *len = i;
 127        return page;
 128}
 129
 130static int page_chain_free(struct page *page)
 131{
 132        struct page *tmp;
 133        int i = 0;
 134        page_chain_for_each_safe(page, tmp) {
 135                put_page(page);
 136                ++i;
 137        }
 138        return i;
 139}
 140
 141static void page_chain_add(struct page **head,
 142                struct page *chain_first, struct page *chain_last)
 143{
 144#if 1
 145        struct page *tmp;
 146        tmp = page_chain_tail(chain_first, NULL);
 147        BUG_ON(tmp != chain_last);
 148#endif
 149
 150        /* add chain to head */
 151        set_page_private(chain_last, (unsigned long)*head);
 152        *head = chain_first;
 153}
 154
 155static struct page *__drbd_alloc_pages(struct drbd_device *device,
 156                                       unsigned int number)
 157{
 158        struct page *page = NULL;
 159        struct page *tmp = NULL;
 160        unsigned int i = 0;
 161
 162        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 163         * So what. It saves a spin_lock. */
 164        if (drbd_pp_vacant >= number) {
 165                spin_lock(&drbd_pp_lock);
 166                page = page_chain_del(&drbd_pp_pool, number);
 167                if (page)
 168                        drbd_pp_vacant -= number;
 169                spin_unlock(&drbd_pp_lock);
 170                if (page)
 171                        return page;
 172        }
 173
 174        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 175         * "criss-cross" setup, that might cause write-out on some other DRBD,
 176         * which in turn might block on the other node at this very place.  */
 177        for (i = 0; i < number; i++) {
 178                tmp = alloc_page(GFP_TRY);
 179                if (!tmp)
 180                        break;
 181                set_page_private(tmp, (unsigned long)page);
 182                page = tmp;
 183        }
 184
 185        if (i == number)
 186                return page;
 187
 188        /* Not enough pages immediately available this time.
 189         * No need to jump around here, drbd_alloc_pages will retry this
 190         * function "soon". */
 191        if (page) {
 192                tmp = page_chain_tail(page, NULL);
 193                spin_lock(&drbd_pp_lock);
 194                page_chain_add(&drbd_pp_pool, page, tmp);
 195                drbd_pp_vacant += i;
 196                spin_unlock(&drbd_pp_lock);
 197        }
 198        return NULL;
 199}
 200
 201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
 202                                           struct list_head *to_be_freed)
 203{
 204        struct drbd_peer_request *peer_req, *tmp;
 205
 206        /* The EEs are always appended to the end of the list. Since
 207           they are sent in order over the wire, they have to finish
 208           in order. As soon as we see the first not finished we can
 209           stop to examine the list... */
 210
 211        list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
 212                if (drbd_peer_req_has_active_page(peer_req))
 213                        break;
 214                list_move(&peer_req->w.list, to_be_freed);
 215        }
 216}
 217
 218static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
 219{
 220        LIST_HEAD(reclaimed);
 221        struct drbd_peer_request *peer_req, *t;
 222
 223        spin_lock_irq(&device->resource->req_lock);
 224        reclaim_finished_net_peer_reqs(device, &reclaimed);
 225        spin_unlock_irq(&device->resource->req_lock);
 226        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 227                drbd_free_net_peer_req(device, peer_req);
 228}
 229
 230static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
 231{
 232        struct drbd_peer_device *peer_device;
 233        int vnr;
 234
 235        rcu_read_lock();
 236        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
 237                struct drbd_device *device = peer_device->device;
 238                if (!atomic_read(&device->pp_in_use_by_net))
 239                        continue;
 240
 241                kref_get(&device->kref);
 242                rcu_read_unlock();
 243                drbd_reclaim_net_peer_reqs(device);
 244                kref_put(&device->kref, drbd_destroy_device);
 245                rcu_read_lock();
 246        }
 247        rcu_read_unlock();
 248}
 249
 250/**
 251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 252 * @device:     DRBD device.
 253 * @number:     number of pages requested
 254 * @retry:      whether to retry, if not enough pages are available right now
 255 *
 256 * Tries to allocate number pages, first from our own page pool, then from
 257 * the kernel.
 258 * Possibly retry until DRBD frees sufficient pages somewhere else.
 259 *
 260 * If this allocation would exceed the max_buffers setting, we throttle
 261 * allocation (schedule_timeout) to give the system some room to breathe.
 262 *
 263 * We do not use max-buffers as hard limit, because it could lead to
 264 * congestion and further to a distributed deadlock during online-verify or
 265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 266 * resync-rate settings are mis-configured.
 267 *
 268 * Returns a page chain linked via page->private.
 269 */
 270struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
 271                              bool retry)
 272{
 273        struct drbd_device *device = peer_device->device;
 274        struct page *page = NULL;
 275        struct net_conf *nc;
 276        DEFINE_WAIT(wait);
 277        unsigned int mxb;
 278
 279        rcu_read_lock();
 280        nc = rcu_dereference(peer_device->connection->net_conf);
 281        mxb = nc ? nc->max_buffers : 1000000;
 282        rcu_read_unlock();
 283
 284        if (atomic_read(&device->pp_in_use) < mxb)
 285                page = __drbd_alloc_pages(device, number);
 286
 287        /* Try to keep the fast path fast, but occasionally we need
 288         * to reclaim the pages we lended to the network stack. */
 289        if (page && atomic_read(&device->pp_in_use_by_net) > 512)
 290                drbd_reclaim_net_peer_reqs(device);
 291
 292        while (page == NULL) {
 293                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 294
 295                drbd_reclaim_net_peer_reqs(device);
 296
 297                if (atomic_read(&device->pp_in_use) < mxb) {
 298                        page = __drbd_alloc_pages(device, number);
 299                        if (page)
 300                                break;
 301                }
 302
 303                if (!retry)
 304                        break;
 305
 306                if (signal_pending(current)) {
 307                        drbd_warn(device, "drbd_alloc_pages interrupted!\n");
 308                        break;
 309                }
 310
 311                if (schedule_timeout(HZ/10) == 0)
 312                        mxb = UINT_MAX;
 313        }
 314        finish_wait(&drbd_pp_wait, &wait);
 315
 316        if (page)
 317                atomic_add(number, &device->pp_in_use);
 318        return page;
 319}
 320
 321/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
 323 * Either links the page chain back to the global pool,
 324 * or returns all pages to the system. */
 325static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
 326{
 327        atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
 328        int i;
 329
 330        if (page == NULL)
 331                return;
 332
 333        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
 334                i = page_chain_free(page);
 335        else {
 336                struct page *tmp;
 337                tmp = page_chain_tail(page, &i);
 338                spin_lock(&drbd_pp_lock);
 339                page_chain_add(&drbd_pp_pool, page, tmp);
 340                drbd_pp_vacant += i;
 341                spin_unlock(&drbd_pp_lock);
 342        }
 343        i = atomic_sub_return(i, a);
 344        if (i < 0)
 345                drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
 346                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 347        wake_up(&drbd_pp_wait);
 348}
 349
 350/*
 351You need to hold the req_lock:
 352 _drbd_wait_ee_list_empty()
 353
 354You must not have the req_lock:
 355 drbd_free_peer_req()
 356 drbd_alloc_peer_req()
 357 drbd_free_peer_reqs()
 358 drbd_ee_fix_bhs()
 359 drbd_finish_peer_reqs()
 360 drbd_clear_done_ee()
 361 drbd_wait_ee_list_empty()
 362*/
 363
 364struct drbd_peer_request *
 365drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
 366                    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
 367{
 368        struct drbd_device *device = peer_device->device;
 369        struct drbd_peer_request *peer_req;
 370        struct page *page = NULL;
 371        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 372
 373        if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
 374                return NULL;
 375
 376        peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 377        if (!peer_req) {
 378                if (!(gfp_mask & __GFP_NOWARN))
 379                        drbd_err(device, "%s: allocation failed\n", __func__);
 380                return NULL;
 381        }
 382
 383        if (has_payload && data_size) {
 384                page = drbd_alloc_pages(peer_device, nr_pages,
 385                                        gfpflags_allow_blocking(gfp_mask));
 386                if (!page)
 387                        goto fail;
 388        }
 389
 390        memset(peer_req, 0, sizeof(*peer_req));
 391        INIT_LIST_HEAD(&peer_req->w.list);
 392        drbd_clear_interval(&peer_req->i);
 393        peer_req->i.size = data_size;
 394        peer_req->i.sector = sector;
 395        peer_req->submit_jif = jiffies;
 396        peer_req->peer_device = peer_device;
 397        peer_req->pages = page;
 398        /*
 399         * The block_id is opaque to the receiver.  It is not endianness
 400         * converted, and sent back to the sender unchanged.
 401         */
 402        peer_req->block_id = id;
 403
 404        return peer_req;
 405
 406 fail:
 407        mempool_free(peer_req, drbd_ee_mempool);
 408        return NULL;
 409}
 410
 411void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
 412                       int is_net)
 413{
 414        might_sleep();
 415        if (peer_req->flags & EE_HAS_DIGEST)
 416                kfree(peer_req->digest);
 417        drbd_free_pages(device, peer_req->pages, is_net);
 418        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
 419        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 420        if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
 421                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 422                drbd_al_complete_io(device, &peer_req->i);
 423        }
 424        mempool_free(peer_req, drbd_ee_mempool);
 425}
 426
 427int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
 428{
 429        LIST_HEAD(work_list);
 430        struct drbd_peer_request *peer_req, *t;
 431        int count = 0;
 432        int is_net = list == &device->net_ee;
 433
 434        spin_lock_irq(&device->resource->req_lock);
 435        list_splice_init(list, &work_list);
 436        spin_unlock_irq(&device->resource->req_lock);
 437
 438        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 439                __drbd_free_peer_req(device, peer_req, is_net);
 440                count++;
 441        }
 442        return count;
 443}
 444
 445/*
 446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 447 */
 448static int drbd_finish_peer_reqs(struct drbd_device *device)
 449{
 450        LIST_HEAD(work_list);
 451        LIST_HEAD(reclaimed);
 452        struct drbd_peer_request *peer_req, *t;
 453        int err = 0;
 454
 455        spin_lock_irq(&device->resource->req_lock);
 456        reclaim_finished_net_peer_reqs(device, &reclaimed);
 457        list_splice_init(&device->done_ee, &work_list);
 458        spin_unlock_irq(&device->resource->req_lock);
 459
 460        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 461                drbd_free_net_peer_req(device, peer_req);
 462
 463        /* possible callbacks here:
 464         * e_end_block, and e_end_resync_block, e_send_superseded.
 465         * all ignore the last argument.
 466         */
 467        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 468                int err2;
 469
 470                /* list_del not necessary, next/prev members not touched */
 471                err2 = peer_req->w.cb(&peer_req->w, !!err);
 472                if (!err)
 473                        err = err2;
 474                drbd_free_peer_req(device, peer_req);
 475        }
 476        wake_up(&device->ee_wait);
 477
 478        return err;
 479}
 480
 481static void _drbd_wait_ee_list_empty(struct drbd_device *device,
 482                                     struct list_head *head)
 483{
 484        DEFINE_WAIT(wait);
 485
 486        /* avoids spin_lock/unlock
 487         * and calling prepare_to_wait in the fast path */
 488        while (!list_empty(head)) {
 489                prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 490                spin_unlock_irq(&device->resource->req_lock);
 491                io_schedule();
 492                finish_wait(&device->ee_wait, &wait);
 493                spin_lock_irq(&device->resource->req_lock);
 494        }
 495}
 496
 497static void drbd_wait_ee_list_empty(struct drbd_device *device,
 498                                    struct list_head *head)
 499{
 500        spin_lock_irq(&device->resource->req_lock);
 501        _drbd_wait_ee_list_empty(device, head);
 502        spin_unlock_irq(&device->resource->req_lock);
 503}
 504
 505static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 506{
 507        struct kvec iov = {
 508                .iov_base = buf,
 509                .iov_len = size,
 510        };
 511        struct msghdr msg = {
 512                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 513        };
 514        return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
 515}
 516
 517static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
 518{
 519        int rv;
 520
 521        rv = drbd_recv_short(connection->data.socket, buf, size, 0);
 522
 523        if (rv < 0) {
 524                if (rv == -ECONNRESET)
 525                        drbd_info(connection, "sock was reset by peer\n");
 526                else if (rv != -ERESTARTSYS)
 527                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
 528        } else if (rv == 0) {
 529                if (test_bit(DISCONNECT_SENT, &connection->flags)) {
 530                        long t;
 531                        rcu_read_lock();
 532                        t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
 533                        rcu_read_unlock();
 534
 535                        t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
 536
 537                        if (t)
 538                                goto out;
 539                }
 540                drbd_info(connection, "sock was shut down by peer\n");
 541        }
 542
 543        if (rv != size)
 544                conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
 545
 546out:
 547        return rv;
 548}
 549
 550static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
 551{
 552        int err;
 553
 554        err = drbd_recv(connection, buf, size);
 555        if (err != size) {
 556                if (err >= 0)
 557                        err = -EIO;
 558        } else
 559                err = 0;
 560        return err;
 561}
 562
 563static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
 564{
 565        int err;
 566
 567        err = drbd_recv_all(connection, buf, size);
 568        if (err && !signal_pending(current))
 569                drbd_warn(connection, "short read (expected size %d)\n", (int)size);
 570        return err;
 571}
 572
 573/* quoting tcp(7):
 574 *   On individual connections, the socket buffer size must be set prior to the
 575 *   listen(2) or connect(2) calls in order to have it take effect.
 576 * This is our wrapper to do so.
 577 */
 578static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 579                unsigned int rcv)
 580{
 581        /* open coded SO_SNDBUF, SO_RCVBUF */
 582        if (snd) {
 583                sock->sk->sk_sndbuf = snd;
 584                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 585        }
 586        if (rcv) {
 587                sock->sk->sk_rcvbuf = rcv;
 588                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 589        }
 590}
 591
 592static struct socket *drbd_try_connect(struct drbd_connection *connection)
 593{
 594        const char *what;
 595        struct socket *sock;
 596        struct sockaddr_in6 src_in6;
 597        struct sockaddr_in6 peer_in6;
 598        struct net_conf *nc;
 599        int err, peer_addr_len, my_addr_len;
 600        int sndbuf_size, rcvbuf_size, connect_int;
 601        int disconnect_on_error = 1;
 602
 603        rcu_read_lock();
 604        nc = rcu_dereference(connection->net_conf);
 605        if (!nc) {
 606                rcu_read_unlock();
 607                return NULL;
 608        }
 609        sndbuf_size = nc->sndbuf_size;
 610        rcvbuf_size = nc->rcvbuf_size;
 611        connect_int = nc->connect_int;
 612        rcu_read_unlock();
 613
 614        my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
 615        memcpy(&src_in6, &connection->my_addr, my_addr_len);
 616
 617        if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
 618                src_in6.sin6_port = 0;
 619        else
 620                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 621
 622        peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
 623        memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
 624
 625        what = "sock_create_kern";
 626        err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
 627                               SOCK_STREAM, IPPROTO_TCP, &sock);
 628        if (err < 0) {
 629                sock = NULL;
 630                goto out;
 631        }
 632
 633        sock->sk->sk_rcvtimeo =
 634        sock->sk->sk_sndtimeo = connect_int * HZ;
 635        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 636
 637       /* explicitly bind to the configured IP as source IP
 638        *  for the outgoing connections.
 639        *  This is needed for multihomed hosts and to be
 640        *  able to use lo: interfaces for drbd.
 641        * Make sure to use 0 as port number, so linux selects
 642        *  a free one dynamically.
 643        */
 644        what = "bind before connect";
 645        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 646        if (err < 0)
 647                goto out;
 648
 649        /* connect may fail, peer not yet available.
 650         * stay C_WF_CONNECTION, don't go Disconnecting! */
 651        disconnect_on_error = 0;
 652        what = "connect";
 653        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 654
 655out:
 656        if (err < 0) {
 657                if (sock) {
 658                        sock_release(sock);
 659                        sock = NULL;
 660                }
 661                switch (-err) {
 662                        /* timeout, busy, signal pending */
 663                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 664                case EINTR: case ERESTARTSYS:
 665                        /* peer not (yet) available, network problem */
 666                case ECONNREFUSED: case ENETUNREACH:
 667                case EHOSTDOWN:    case EHOSTUNREACH:
 668                        disconnect_on_error = 0;
 669                        break;
 670                default:
 671                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 672                }
 673                if (disconnect_on_error)
 674                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 675        }
 676
 677        return sock;
 678}
 679
 680struct accept_wait_data {
 681        struct drbd_connection *connection;
 682        struct socket *s_listen;
 683        struct completion door_bell;
 684        void (*original_sk_state_change)(struct sock *sk);
 685
 686};
 687
 688static void drbd_incoming_connection(struct sock *sk)
 689{
 690        struct accept_wait_data *ad = sk->sk_user_data;
 691        void (*state_change)(struct sock *sk);
 692
 693        state_change = ad->original_sk_state_change;
 694        if (sk->sk_state == TCP_ESTABLISHED)
 695                complete(&ad->door_bell);
 696        state_change(sk);
 697}
 698
 699static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
 700{
 701        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 702        struct sockaddr_in6 my_addr;
 703        struct socket *s_listen;
 704        struct net_conf *nc;
 705        const char *what;
 706
 707        rcu_read_lock();
 708        nc = rcu_dereference(connection->net_conf);
 709        if (!nc) {
 710                rcu_read_unlock();
 711                return -EIO;
 712        }
 713        sndbuf_size = nc->sndbuf_size;
 714        rcvbuf_size = nc->rcvbuf_size;
 715        rcu_read_unlock();
 716
 717        my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
 718        memcpy(&my_addr, &connection->my_addr, my_addr_len);
 719
 720        what = "sock_create_kern";
 721        err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
 722                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 723        if (err) {
 724                s_listen = NULL;
 725                goto out;
 726        }
 727
 728        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 729        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 730
 731        what = "bind before listen";
 732        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 733        if (err < 0)
 734                goto out;
 735
 736        ad->s_listen = s_listen;
 737        write_lock_bh(&s_listen->sk->sk_callback_lock);
 738        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 739        s_listen->sk->sk_state_change = drbd_incoming_connection;
 740        s_listen->sk->sk_user_data = ad;
 741        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 742
 743        what = "listen";
 744        err = s_listen->ops->listen(s_listen, 5);
 745        if (err < 0)
 746                goto out;
 747
 748        return 0;
 749out:
 750        if (s_listen)
 751                sock_release(s_listen);
 752        if (err < 0) {
 753                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 754                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 755                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 756                }
 757        }
 758
 759        return -EIO;
 760}
 761
 762static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 763{
 764        write_lock_bh(&sk->sk_callback_lock);
 765        sk->sk_state_change = ad->original_sk_state_change;
 766        sk->sk_user_data = NULL;
 767        write_unlock_bh(&sk->sk_callback_lock);
 768}
 769
 770static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
 771{
 772        int timeo, connect_int, err = 0;
 773        struct socket *s_estab = NULL;
 774        struct net_conf *nc;
 775
 776        rcu_read_lock();
 777        nc = rcu_dereference(connection->net_conf);
 778        if (!nc) {
 779                rcu_read_unlock();
 780                return NULL;
 781        }
 782        connect_int = nc->connect_int;
 783        rcu_read_unlock();
 784
 785        timeo = connect_int * HZ;
 786        /* 28.5% random jitter */
 787        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 788
 789        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 790        if (err <= 0)
 791                return NULL;
 792
 793        err = kernel_accept(ad->s_listen, &s_estab, 0);
 794        if (err < 0) {
 795                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 796                        drbd_err(connection, "accept failed, err = %d\n", err);
 797                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 798                }
 799        }
 800
 801        if (s_estab)
 802                unregister_state_change(s_estab->sk, ad);
 803
 804        return s_estab;
 805}
 806
 807static int decode_header(struct drbd_connection *, void *, struct packet_info *);
 808
 809static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
 810                             enum drbd_packet cmd)
 811{
 812        if (!conn_prepare_command(connection, sock))
 813                return -EIO;
 814        return conn_send_command(connection, sock, cmd, 0, NULL, 0);
 815}
 816
 817static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
 818{
 819        unsigned int header_size = drbd_header_size(connection);
 820        struct packet_info pi;
 821        struct net_conf *nc;
 822        int err;
 823
 824        rcu_read_lock();
 825        nc = rcu_dereference(connection->net_conf);
 826        if (!nc) {
 827                rcu_read_unlock();
 828                return -EIO;
 829        }
 830        sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
 831        rcu_read_unlock();
 832
 833        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
 834        if (err != header_size) {
 835                if (err >= 0)
 836                        err = -EIO;
 837                return err;
 838        }
 839        err = decode_header(connection, connection->data.rbuf, &pi);
 840        if (err)
 841                return err;
 842        return pi.cmd;
 843}
 844
 845/**
 846 * drbd_socket_okay() - Free the socket if its connection is not okay
 847 * @sock:       pointer to the pointer to the socket.
 848 */
 849static bool drbd_socket_okay(struct socket **sock)
 850{
 851        int rr;
 852        char tb[4];
 853
 854        if (!*sock)
 855                return false;
 856
 857        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 858
 859        if (rr > 0 || rr == -EAGAIN) {
 860                return true;
 861        } else {
 862                sock_release(*sock);
 863                *sock = NULL;
 864                return false;
 865        }
 866}
 867
 868static bool connection_established(struct drbd_connection *connection,
 869                                   struct socket **sock1,
 870                                   struct socket **sock2)
 871{
 872        struct net_conf *nc;
 873        int timeout;
 874        bool ok;
 875
 876        if (!*sock1 || !*sock2)
 877                return false;
 878
 879        rcu_read_lock();
 880        nc = rcu_dereference(connection->net_conf);
 881        timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
 882        rcu_read_unlock();
 883        schedule_timeout_interruptible(timeout);
 884
 885        ok = drbd_socket_okay(sock1);
 886        ok = drbd_socket_okay(sock2) && ok;
 887
 888        return ok;
 889}
 890
 891/* Gets called if a connection is established, or if a new minor gets created
 892   in a connection */
 893int drbd_connected(struct drbd_peer_device *peer_device)
 894{
 895        struct drbd_device *device = peer_device->device;
 896        int err;
 897
 898        atomic_set(&device->packet_seq, 0);
 899        device->peer_seq = 0;
 900
 901        device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
 902                &peer_device->connection->cstate_mutex :
 903                &device->own_state_mutex;
 904
 905        err = drbd_send_sync_param(peer_device);
 906        if (!err)
 907                err = drbd_send_sizes(peer_device, 0, 0);
 908        if (!err)
 909                err = drbd_send_uuids(peer_device);
 910        if (!err)
 911                err = drbd_send_current_state(peer_device);
 912        clear_bit(USE_DEGR_WFC_T, &device->flags);
 913        clear_bit(RESIZE_PENDING, &device->flags);
 914        atomic_set(&device->ap_in_flight, 0);
 915        mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
 916        return err;
 917}
 918
 919/*
 920 * return values:
 921 *   1 yes, we have a valid connection
 922 *   0 oops, did not work out, please try again
 923 *  -1 peer talks different language,
 924 *     no point in trying again, please go standalone.
 925 *  -2 We do not have a network config...
 926 */
 927static int conn_connect(struct drbd_connection *connection)
 928{
 929        struct drbd_socket sock, msock;
 930        struct drbd_peer_device *peer_device;
 931        struct net_conf *nc;
 932        int vnr, timeout, h;
 933        bool discard_my_data, ok;
 934        enum drbd_state_rv rv;
 935        struct accept_wait_data ad = {
 936                .connection = connection,
 937                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 938        };
 939
 940        clear_bit(DISCONNECT_SENT, &connection->flags);
 941        if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 942                return -2;
 943
 944        mutex_init(&sock.mutex);
 945        sock.sbuf = connection->data.sbuf;
 946        sock.rbuf = connection->data.rbuf;
 947        sock.socket = NULL;
 948        mutex_init(&msock.mutex);
 949        msock.sbuf = connection->meta.sbuf;
 950        msock.rbuf = connection->meta.rbuf;
 951        msock.socket = NULL;
 952
 953        /* Assume that the peer only understands protocol 80 until we know better.  */
 954        connection->agreed_pro_version = 80;
 955
 956        if (prepare_listen_socket(connection, &ad))
 957                return 0;
 958
 959        do {
 960                struct socket *s;
 961
 962                s = drbd_try_connect(connection);
 963                if (s) {
 964                        if (!sock.socket) {
 965                                sock.socket = s;
 966                                send_first_packet(connection, &sock, P_INITIAL_DATA);
 967                        } else if (!msock.socket) {
 968                                clear_bit(RESOLVE_CONFLICTS, &connection->flags);
 969                                msock.socket = s;
 970                                send_first_packet(connection, &msock, P_INITIAL_META);
 971                        } else {
 972                                drbd_err(connection, "Logic error in conn_connect()\n");
 973                                goto out_release_sockets;
 974                        }
 975                }
 976
 977                if (connection_established(connection, &sock.socket, &msock.socket))
 978                        break;
 979
 980retry:
 981                s = drbd_wait_for_connect(connection, &ad);
 982                if (s) {
 983                        int fp = receive_first_packet(connection, s);
 984                        drbd_socket_okay(&sock.socket);
 985                        drbd_socket_okay(&msock.socket);
 986                        switch (fp) {
 987                        case P_INITIAL_DATA:
 988                                if (sock.socket) {
 989                                        drbd_warn(connection, "initial packet S crossed\n");
 990                                        sock_release(sock.socket);
 991                                        sock.socket = s;
 992                                        goto randomize;
 993                                }
 994                                sock.socket = s;
 995                                break;
 996                        case P_INITIAL_META:
 997                                set_bit(RESOLVE_CONFLICTS, &connection->flags);
 998                                if (msock.socket) {
 999                                        drbd_warn(connection, "initial packet M crossed\n");
1000                                        sock_release(msock.socket);
1001                                        msock.socket = s;
1002                                        goto randomize;
1003                                }
1004                                msock.socket = s;
1005                                break;
1006                        default:
1007                                drbd_warn(connection, "Error receiving initial packet\n");
1008                                sock_release(s);
1009randomize:
1010                                if (prandom_u32() & 1)
1011                                        goto retry;
1012                        }
1013                }
1014
1015                if (connection->cstate <= C_DISCONNECTING)
1016                        goto out_release_sockets;
1017                if (signal_pending(current)) {
1018                        flush_signals(current);
1019                        smp_rmb();
1020                        if (get_t_state(&connection->receiver) == EXITING)
1021                                goto out_release_sockets;
1022                }
1023
1024                ok = connection_established(connection, &sock.socket, &msock.socket);
1025        } while (!ok);
1026
1027        if (ad.s_listen)
1028                sock_release(ad.s_listen);
1029
1030        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032
1033        sock.socket->sk->sk_allocation = GFP_NOIO;
1034        msock.socket->sk->sk_allocation = GFP_NOIO;
1035
1036        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038
1039        /* NOT YET ...
1040         * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042         * first set it to the P_CONNECTION_FEATURES timeout,
1043         * which we set to 4x the configured ping_timeout. */
1044        rcu_read_lock();
1045        nc = rcu_dereference(connection->net_conf);
1046
1047        sock.socket->sk->sk_sndtimeo =
1048        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049
1050        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051        timeout = nc->timeout * HZ / 10;
1052        discard_my_data = nc->discard_my_data;
1053        rcu_read_unlock();
1054
1055        msock.socket->sk->sk_sndtimeo = timeout;
1056
1057        /* we don't want delays.
1058         * we use TCP_CORK where appropriate, though */
1059        drbd_tcp_nodelay(sock.socket);
1060        drbd_tcp_nodelay(msock.socket);
1061
1062        connection->data.socket = sock.socket;
1063        connection->meta.socket = msock.socket;
1064        connection->last_received = jiffies;
1065
1066        h = drbd_do_features(connection);
1067        if (h <= 0)
1068                return h;
1069
1070        if (connection->cram_hmac_tfm) {
1071                /* drbd_request_state(device, NS(conn, WFAuth)); */
1072                switch (drbd_do_auth(connection)) {
1073                case -1:
1074                        drbd_err(connection, "Authentication of peer failed\n");
1075                        return -1;
1076                case 0:
1077                        drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078                        return 0;
1079                }
1080        }
1081
1082        connection->data.socket->sk->sk_sndtimeo = timeout;
1083        connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084
1085        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086                return -1;
1087
1088        /* Prevent a race between resync-handshake and
1089         * being promoted to Primary.
1090         *
1091         * Grab and release the state mutex, so we know that any current
1092         * drbd_set_role() is finished, and any incoming drbd_set_role
1093         * will see the STATE_SENT flag, and wait for it to be cleared.
1094         */
1095        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096                mutex_lock(peer_device->device->state_mutex);
1097
1098        set_bit(STATE_SENT, &connection->flags);
1099
1100        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                mutex_unlock(peer_device->device->state_mutex);
1102
1103        rcu_read_lock();
1104        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105                struct drbd_device *device = peer_device->device;
1106                kref_get(&device->kref);
1107                rcu_read_unlock();
1108
1109                if (discard_my_data)
1110                        set_bit(DISCARD_MY_DATA, &device->flags);
1111                else
1112                        clear_bit(DISCARD_MY_DATA, &device->flags);
1113
1114                drbd_connected(peer_device);
1115                kref_put(&device->kref, drbd_destroy_device);
1116                rcu_read_lock();
1117        }
1118        rcu_read_unlock();
1119
1120        rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121        if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122                clear_bit(STATE_SENT, &connection->flags);
1123                return 0;
1124        }
1125
1126        drbd_thread_start(&connection->ack_receiver);
1127        /* opencoded create_singlethread_workqueue(),
1128         * to be able to use format string arguments */
1129        connection->ack_sender =
1130                alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131        if (!connection->ack_sender) {
1132                drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133                return 0;
1134        }
1135
1136        mutex_lock(&connection->resource->conf_update);
1137        /* The discard_my_data flag is a single-shot modifier to the next
1138         * connection attempt, the handshake of which is now well underway.
1139         * No need for rcu style copying of the whole struct
1140         * just to clear a single value. */
1141        connection->net_conf->discard_my_data = 0;
1142        mutex_unlock(&connection->resource->conf_update);
1143
1144        return h;
1145
1146out_release_sockets:
1147        if (ad.s_listen)
1148                sock_release(ad.s_listen);
1149        if (sock.socket)
1150                sock_release(sock.socket);
1151        if (msock.socket)
1152                sock_release(msock.socket);
1153        return -1;
1154}
1155
1156static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157{
1158        unsigned int header_size = drbd_header_size(connection);
1159
1160        if (header_size == sizeof(struct p_header100) &&
1161            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162                struct p_header100 *h = header;
1163                if (h->pad != 0) {
1164                        drbd_err(connection, "Header padding is not zero\n");
1165                        return -EINVAL;
1166                }
1167                pi->vnr = be16_to_cpu(h->volume);
1168                pi->cmd = be16_to_cpu(h->command);
1169                pi->size = be32_to_cpu(h->length);
1170        } else if (header_size == sizeof(struct p_header95) &&
1171                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172                struct p_header95 *h = header;
1173                pi->cmd = be16_to_cpu(h->command);
1174                pi->size = be32_to_cpu(h->length);
1175                pi->vnr = 0;
1176        } else if (header_size == sizeof(struct p_header80) &&
1177                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178                struct p_header80 *h = header;
1179                pi->cmd = be16_to_cpu(h->command);
1180                pi->size = be16_to_cpu(h->length);
1181                pi->vnr = 0;
1182        } else {
1183                drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184                         be32_to_cpu(*(__be32 *)header),
1185                         connection->agreed_pro_version);
1186                return -EINVAL;
1187        }
1188        pi->data = header + header_size;
1189        return 0;
1190}
1191
1192static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193{
1194        void *buffer = connection->data.rbuf;
1195        int err;
1196
1197        err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198        if (err)
1199                return err;
1200
1201        err = decode_header(connection, buffer, pi);
1202        connection->last_received = jiffies;
1203
1204        return err;
1205}
1206
1207static void drbd_flush(struct drbd_connection *connection)
1208{
1209        int rv;
1210        struct drbd_peer_device *peer_device;
1211        int vnr;
1212
1213        if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214                rcu_read_lock();
1215                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216                        struct drbd_device *device = peer_device->device;
1217
1218                        if (!get_ldev(device))
1219                                continue;
1220                        kref_get(&device->kref);
1221                        rcu_read_unlock();
1222
1223                        /* Right now, we have only this one synchronous code path
1224                         * for flushes between request epochs.
1225                         * We may want to make those asynchronous,
1226                         * or at least parallelize the flushes to the volume devices.
1227                         */
1228                        device->flush_jif = jiffies;
1229                        set_bit(FLUSH_PENDING, &device->flags);
1230                        rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231                                        GFP_NOIO, NULL);
1232                        clear_bit(FLUSH_PENDING, &device->flags);
1233                        if (rv) {
1234                                drbd_info(device, "local disk flush failed with status %d\n", rv);
1235                                /* would rather check on EOPNOTSUPP, but that is not reliable.
1236                                 * don't try again for ANY return value != 0
1237                                 * if (rv == -EOPNOTSUPP) */
1238                                drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239                        }
1240                        put_ldev(device);
1241                        kref_put(&device->kref, drbd_destroy_device);
1242
1243                        rcu_read_lock();
1244                        if (rv)
1245                                break;
1246                }
1247                rcu_read_unlock();
1248        }
1249}
1250
1251/**
1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253 * @device:     DRBD device.
1254 * @epoch:      Epoch object.
1255 * @ev:         Epoch event.
1256 */
1257static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258                                               struct drbd_epoch *epoch,
1259                                               enum epoch_event ev)
1260{
1261        int epoch_size;
1262        struct drbd_epoch *next_epoch;
1263        enum finish_epoch rv = FE_STILL_LIVE;
1264
1265        spin_lock(&connection->epoch_lock);
1266        do {
1267                next_epoch = NULL;
1268
1269                epoch_size = atomic_read(&epoch->epoch_size);
1270
1271                switch (ev & ~EV_CLEANUP) {
1272                case EV_PUT:
1273                        atomic_dec(&epoch->active);
1274                        break;
1275                case EV_GOT_BARRIER_NR:
1276                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277                        break;
1278                case EV_BECAME_LAST:
1279                        /* nothing to do*/
1280                        break;
1281                }
1282
1283                if (epoch_size != 0 &&
1284                    atomic_read(&epoch->active) == 0 &&
1285                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286                        if (!(ev & EV_CLEANUP)) {
1287                                spin_unlock(&connection->epoch_lock);
1288                                drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289                                spin_lock(&connection->epoch_lock);
1290                        }
1291#if 0
1292                        /* FIXME: dec unacked on connection, once we have
1293                         * something to count pending connection packets in. */
1294                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295                                dec_unacked(epoch->connection);
1296#endif
1297
1298                        if (connection->current_epoch != epoch) {
1299                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300                                list_del(&epoch->list);
1301                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302                                connection->epochs--;
1303                                kfree(epoch);
1304
1305                                if (rv == FE_STILL_LIVE)
1306                                        rv = FE_DESTROYED;
1307                        } else {
1308                                epoch->flags = 0;
1309                                atomic_set(&epoch->epoch_size, 0);
1310                                /* atomic_set(&epoch->active, 0); is already zero */
1311                                if (rv == FE_STILL_LIVE)
1312                                        rv = FE_RECYCLED;
1313                        }
1314                }
1315
1316                if (!next_epoch)
1317                        break;
1318
1319                epoch = next_epoch;
1320        } while (1);
1321
1322        spin_unlock(&connection->epoch_lock);
1323
1324        return rv;
1325}
1326
1327static enum write_ordering_e
1328max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329{
1330        struct disk_conf *dc;
1331
1332        dc = rcu_dereference(bdev->disk_conf);
1333
1334        if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335                wo = WO_DRAIN_IO;
1336        if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337                wo = WO_NONE;
1338
1339        return wo;
1340}
1341
1342/**
1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344 * @connection: DRBD connection.
1345 * @wo:         Write ordering method to try.
1346 */
1347void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348                              enum write_ordering_e wo)
1349{
1350        struct drbd_device *device;
1351        enum write_ordering_e pwo;
1352        int vnr;
1353        static char *write_ordering_str[] = {
1354                [WO_NONE] = "none",
1355                [WO_DRAIN_IO] = "drain",
1356                [WO_BDEV_FLUSH] = "flush",
1357        };
1358
1359        pwo = resource->write_ordering;
1360        if (wo != WO_BDEV_FLUSH)
1361                wo = min(pwo, wo);
1362        rcu_read_lock();
1363        idr_for_each_entry(&resource->devices, device, vnr) {
1364                if (get_ldev(device)) {
1365                        wo = max_allowed_wo(device->ldev, wo);
1366                        if (device->ldev == bdev)
1367                                bdev = NULL;
1368                        put_ldev(device);
1369                }
1370        }
1371
1372        if (bdev)
1373                wo = max_allowed_wo(bdev, wo);
1374
1375        rcu_read_unlock();
1376
1377        resource->write_ordering = wo;
1378        if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379                drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380}
1381
1382/**
1383 * drbd_submit_peer_request()
1384 * @device:     DRBD device.
1385 * @peer_req:   peer request
1386 * @rw:         flag field, see bio->bi_rw
1387 *
1388 * May spread the pages to multiple bios,
1389 * depending on bio_add_page restrictions.
1390 *
1391 * Returns 0 if all bios have been submitted,
1392 * -ENOMEM if we could not allocate enough bios,
1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394 *  single page to an empty bio (which should never happen and likely indicates
1395 *  that the lower level IO stack is in some way broken). This has been observed
1396 *  on certain Xen deployments.
1397 */
1398/* TODO allocate from our own bio_set. */
1399int drbd_submit_peer_request(struct drbd_device *device,
1400                             struct drbd_peer_request *peer_req,
1401                             const unsigned rw, const int fault_type)
1402{
1403        struct bio *bios = NULL;
1404        struct bio *bio;
1405        struct page *page = peer_req->pages;
1406        sector_t sector = peer_req->i.sector;
1407        unsigned data_size = peer_req->i.size;
1408        unsigned n_bios = 0;
1409        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1410        int err = -ENOMEM;
1411
1412        if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1413                /* wait for all pending IO completions, before we start
1414                 * zeroing things out. */
1415                conn_wait_active_ee_empty(peer_req->peer_device->connection);
1416                /* add it to the active list now,
1417                 * so we can find it to present it in debugfs */
1418                peer_req->submit_jif = jiffies;
1419                peer_req->flags |= EE_SUBMITTED;
1420                spin_lock_irq(&device->resource->req_lock);
1421                list_add_tail(&peer_req->w.list, &device->active_ee);
1422                spin_unlock_irq(&device->resource->req_lock);
1423                if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1424                        sector, data_size >> 9, GFP_NOIO, false))
1425                        peer_req->flags |= EE_WAS_ERROR;
1426                drbd_endio_write_sec_final(peer_req);
1427                return 0;
1428        }
1429
1430        /* Discards don't have any payload.
1431         * But the scsi layer still expects a bio_vec it can use internally,
1432         * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433        if (peer_req->flags & EE_IS_TRIM)
1434                nr_pages = 1;
1435
1436        /* In most cases, we will only need one bio.  But in case the lower
1437         * level restrictions happen to be different at this offset on this
1438         * side than those of the sending peer, we may need to submit the
1439         * request in more than one bio.
1440         *
1441         * Plain bio_alloc is good enough here, this is no DRBD internally
1442         * generated bio, but a bio allocated on behalf of the peer.
1443         */
1444next_bio:
1445        bio = bio_alloc(GFP_NOIO, nr_pages);
1446        if (!bio) {
1447                drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1448                goto fail;
1449        }
1450        /* > peer_req->i.sector, unless this is the first bio */
1451        bio->bi_iter.bi_sector = sector;
1452        bio->bi_bdev = device->ldev->backing_bdev;
1453        bio->bi_rw = rw;
1454        bio->bi_private = peer_req;
1455        bio->bi_end_io = drbd_peer_request_endio;
1456
1457        bio->bi_next = bios;
1458        bios = bio;
1459        ++n_bios;
1460
1461        if (rw & REQ_DISCARD) {
1462                bio->bi_iter.bi_size = data_size;
1463                goto submit;
1464        }
1465
1466        page_chain_for_each(page) {
1467                unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1468                if (!bio_add_page(bio, page, len, 0)) {
1469                        /* A single page must always be possible!
1470                         * But in case it fails anyways,
1471                         * we deal with it, and complain (below). */
1472                        if (bio->bi_vcnt == 0) {
1473                                drbd_err(device,
1474                                        "bio_add_page failed for len=%u, "
1475                                        "bi_vcnt=0 (bi_sector=%llu)\n",
1476                                        len, (uint64_t)bio->bi_iter.bi_sector);
1477                                err = -ENOSPC;
1478                                goto fail;
1479                        }
1480                        goto next_bio;
1481                }
1482                data_size -= len;
1483                sector += len >> 9;
1484                --nr_pages;
1485        }
1486        D_ASSERT(device, data_size == 0);
1487submit:
1488        D_ASSERT(device, page == NULL);
1489
1490        atomic_set(&peer_req->pending_bios, n_bios);
1491        /* for debugfs: update timestamp, mark as submitted */
1492        peer_req->submit_jif = jiffies;
1493        peer_req->flags |= EE_SUBMITTED;
1494        do {
1495                bio = bios;
1496                bios = bios->bi_next;
1497                bio->bi_next = NULL;
1498
1499                drbd_generic_make_request(device, fault_type, bio);
1500        } while (bios);
1501        return 0;
1502
1503fail:
1504        while (bios) {
1505                bio = bios;
1506                bios = bios->bi_next;
1507                bio_put(bio);
1508        }
1509        return err;
1510}
1511
1512static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1513                                             struct drbd_peer_request *peer_req)
1514{
1515        struct drbd_interval *i = &peer_req->i;
1516
1517        drbd_remove_interval(&device->write_requests, i);
1518        drbd_clear_interval(i);
1519
1520        /* Wake up any processes waiting for this peer request to complete.  */
1521        if (i->waiting)
1522                wake_up(&device->misc_wait);
1523}
1524
1525static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1526{
1527        struct drbd_peer_device *peer_device;
1528        int vnr;
1529
1530        rcu_read_lock();
1531        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1532                struct drbd_device *device = peer_device->device;
1533
1534                kref_get(&device->kref);
1535                rcu_read_unlock();
1536                drbd_wait_ee_list_empty(device, &device->active_ee);
1537                kref_put(&device->kref, drbd_destroy_device);
1538                rcu_read_lock();
1539        }
1540        rcu_read_unlock();
1541}
1542
1543static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1544{
1545        int rv;
1546        struct p_barrier *p = pi->data;
1547        struct drbd_epoch *epoch;
1548
1549        /* FIXME these are unacked on connection,
1550         * not a specific (peer)device.
1551         */
1552        connection->current_epoch->barrier_nr = p->barrier;
1553        connection->current_epoch->connection = connection;
1554        rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1555
1556        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557         * the activity log, which means it would not be resynced in case the
1558         * R_PRIMARY crashes now.
1559         * Therefore we must send the barrier_ack after the barrier request was
1560         * completed. */
1561        switch (connection->resource->write_ordering) {
1562        case WO_NONE:
1563                if (rv == FE_RECYCLED)
1564                        return 0;
1565
1566                /* receiver context, in the writeout path of the other node.
1567                 * avoid potential distributed deadlock */
1568                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1569                if (epoch)
1570                        break;
1571                else
1572                        drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1573                        /* Fall through */
1574
1575        case WO_BDEV_FLUSH:
1576        case WO_DRAIN_IO:
1577                conn_wait_active_ee_empty(connection);
1578                drbd_flush(connection);
1579
1580                if (atomic_read(&connection->current_epoch->epoch_size)) {
1581                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1582                        if (epoch)
1583                                break;
1584                }
1585
1586                return 0;
1587        default:
1588                drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1589                         connection->resource->write_ordering);
1590                return -EIO;
1591        }
1592
1593        epoch->flags = 0;
1594        atomic_set(&epoch->epoch_size, 0);
1595        atomic_set(&epoch->active, 0);
1596
1597        spin_lock(&connection->epoch_lock);
1598        if (atomic_read(&connection->current_epoch->epoch_size)) {
1599                list_add(&epoch->list, &connection->current_epoch->list);
1600                connection->current_epoch = epoch;
1601                connection->epochs++;
1602        } else {
1603                /* The current_epoch got recycled while we allocated this one... */
1604                kfree(epoch);
1605        }
1606        spin_unlock(&connection->epoch_lock);
1607
1608        return 0;
1609}
1610
1611/* used from receive_RSDataReply (recv_resync_read)
1612 * and from receive_Data */
1613static struct drbd_peer_request *
1614read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1615              struct packet_info *pi) __must_hold(local)
1616{
1617        struct drbd_device *device = peer_device->device;
1618        const sector_t capacity = drbd_get_capacity(device->this_bdev);
1619        struct drbd_peer_request *peer_req;
1620        struct page *page;
1621        int digest_size, err;
1622        unsigned int data_size = pi->size, ds;
1623        void *dig_in = peer_device->connection->int_dig_in;
1624        void *dig_vv = peer_device->connection->int_dig_vv;
1625        unsigned long *data;
1626        struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1627
1628        digest_size = 0;
1629        if (!trim && peer_device->connection->peer_integrity_tfm) {
1630                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1631                /*
1632                 * FIXME: Receive the incoming digest into the receive buffer
1633                 *        here, together with its struct p_data?
1634                 */
1635                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1636                if (err)
1637                        return NULL;
1638                data_size -= digest_size;
1639        }
1640
1641        if (trim) {
1642                D_ASSERT(peer_device, data_size == 0);
1643                data_size = be32_to_cpu(trim->size);
1644        }
1645
1646        if (!expect(IS_ALIGNED(data_size, 512)))
1647                return NULL;
1648        /* prepare for larger trim requests. */
1649        if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1650                return NULL;
1651
1652        /* even though we trust out peer,
1653         * we sometimes have to double check. */
1654        if (sector + (data_size>>9) > capacity) {
1655                drbd_err(device, "request from peer beyond end of local disk: "
1656                        "capacity: %llus < sector: %llus + size: %u\n",
1657                        (unsigned long long)capacity,
1658                        (unsigned long long)sector, data_size);
1659                return NULL;
1660        }
1661
1662        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663         * "criss-cross" setup, that might cause write-out on some other DRBD,
1664         * which in turn might block on the other node at this very place.  */
1665        peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1666        if (!peer_req)
1667                return NULL;
1668
1669        peer_req->flags |= EE_WRITE;
1670        if (trim)
1671                return peer_req;
1672
1673        ds = data_size;
1674        page = peer_req->pages;
1675        page_chain_for_each(page) {
1676                unsigned len = min_t(int, ds, PAGE_SIZE);
1677                data = kmap(page);
1678                err = drbd_recv_all_warn(peer_device->connection, data, len);
1679                if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1680                        drbd_err(device, "Fault injection: Corrupting data on receive\n");
1681                        data[0] = data[0] ^ (unsigned long)-1;
1682                }
1683                kunmap(page);
1684                if (err) {
1685                        drbd_free_peer_req(device, peer_req);
1686                        return NULL;
1687                }
1688                ds -= len;
1689        }
1690
1691        if (digest_size) {
1692                drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1693                if (memcmp(dig_in, dig_vv, digest_size)) {
1694                        drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1695                                (unsigned long long)sector, data_size);
1696                        drbd_free_peer_req(device, peer_req);
1697                        return NULL;
1698                }
1699        }
1700        device->recv_cnt += data_size >> 9;
1701        return peer_req;
1702}
1703
1704/* drbd_drain_block() just takes a data block
1705 * out of the socket input buffer, and discards it.
1706 */
1707static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1708{
1709        struct page *page;
1710        int err = 0;
1711        void *data;
1712
1713        if (!data_size)
1714                return 0;
1715
1716        page = drbd_alloc_pages(peer_device, 1, 1);
1717
1718        data = kmap(page);
1719        while (data_size) {
1720                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1721
1722                err = drbd_recv_all_warn(peer_device->connection, data, len);
1723                if (err)
1724                        break;
1725                data_size -= len;
1726        }
1727        kunmap(page);
1728        drbd_free_pages(peer_device->device, page, 0);
1729        return err;
1730}
1731
1732static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1733                           sector_t sector, int data_size)
1734{
1735        struct bio_vec bvec;
1736        struct bvec_iter iter;
1737        struct bio *bio;
1738        int digest_size, err, expect;
1739        void *dig_in = peer_device->connection->int_dig_in;
1740        void *dig_vv = peer_device->connection->int_dig_vv;
1741
1742        digest_size = 0;
1743        if (peer_device->connection->peer_integrity_tfm) {
1744                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1745                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1746                if (err)
1747                        return err;
1748                data_size -= digest_size;
1749        }
1750
1751        /* optimistically update recv_cnt.  if receiving fails below,
1752         * we disconnect anyways, and counters will be reset. */
1753        peer_device->device->recv_cnt += data_size>>9;
1754
1755        bio = req->master_bio;
1756        D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1757
1758        bio_for_each_segment(bvec, bio, iter) {
1759                void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1760                expect = min_t(int, data_size, bvec.bv_len);
1761                err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1762                kunmap(bvec.bv_page);
1763                if (err)
1764                        return err;
1765                data_size -= expect;
1766        }
1767
1768        if (digest_size) {
1769                drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1770                if (memcmp(dig_in, dig_vv, digest_size)) {
1771                        drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1772                        return -EINVAL;
1773                }
1774        }
1775
1776        D_ASSERT(peer_device->device, data_size == 0);
1777        return 0;
1778}
1779
1780/*
1781 * e_end_resync_block() is called in ack_sender context via
1782 * drbd_finish_peer_reqs().
1783 */
1784static int e_end_resync_block(struct drbd_work *w, int unused)
1785{
1786        struct drbd_peer_request *peer_req =
1787                container_of(w, struct drbd_peer_request, w);
1788        struct drbd_peer_device *peer_device = peer_req->peer_device;
1789        struct drbd_device *device = peer_device->device;
1790        sector_t sector = peer_req->i.sector;
1791        int err;
1792
1793        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1794
1795        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1796                drbd_set_in_sync(device, sector, peer_req->i.size);
1797                err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1798        } else {
1799                /* Record failure to sync */
1800                drbd_rs_failed_io(device, sector, peer_req->i.size);
1801
1802                err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1803        }
1804        dec_unacked(device);
1805
1806        return err;
1807}
1808
1809static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1810                            struct packet_info *pi) __releases(local)
1811{
1812        struct drbd_device *device = peer_device->device;
1813        struct drbd_peer_request *peer_req;
1814
1815        peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1816        if (!peer_req)
1817                goto fail;
1818
1819        dec_rs_pending(device);
1820
1821        inc_unacked(device);
1822        /* corresponding dec_unacked() in e_end_resync_block()
1823         * respective _drbd_clear_done_ee */
1824
1825        peer_req->w.cb = e_end_resync_block;
1826        peer_req->submit_jif = jiffies;
1827
1828        spin_lock_irq(&device->resource->req_lock);
1829        list_add_tail(&peer_req->w.list, &device->sync_ee);
1830        spin_unlock_irq(&device->resource->req_lock);
1831
1832        atomic_add(pi->size >> 9, &device->rs_sect_ev);
1833        if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1834                return 0;
1835
1836        /* don't care for the reason here */
1837        drbd_err(device, "submit failed, triggering re-connect\n");
1838        spin_lock_irq(&device->resource->req_lock);
1839        list_del(&peer_req->w.list);
1840        spin_unlock_irq(&device->resource->req_lock);
1841
1842        drbd_free_peer_req(device, peer_req);
1843fail:
1844        put_ldev(device);
1845        return -EIO;
1846}
1847
1848static struct drbd_request *
1849find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1850             sector_t sector, bool missing_ok, const char *func)
1851{
1852        struct drbd_request *req;
1853
1854        /* Request object according to our peer */
1855        req = (struct drbd_request *)(unsigned long)id;
1856        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1857                return req;
1858        if (!missing_ok) {
1859                drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1860                        (unsigned long)id, (unsigned long long)sector);
1861        }
1862        return NULL;
1863}
1864
1865static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1866{
1867        struct drbd_peer_device *peer_device;
1868        struct drbd_device *device;
1869        struct drbd_request *req;
1870        sector_t sector;
1871        int err;
1872        struct p_data *p = pi->data;
1873
1874        peer_device = conn_peer_device(connection, pi->vnr);
1875        if (!peer_device)
1876                return -EIO;
1877        device = peer_device->device;
1878
1879        sector = be64_to_cpu(p->sector);
1880
1881        spin_lock_irq(&device->resource->req_lock);
1882        req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1883        spin_unlock_irq(&device->resource->req_lock);
1884        if (unlikely(!req))
1885                return -EIO;
1886
1887        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888         * special casing it there for the various failure cases.
1889         * still no race with drbd_fail_pending_reads */
1890        err = recv_dless_read(peer_device, req, sector, pi->size);
1891        if (!err)
1892                req_mod(req, DATA_RECEIVED);
1893        /* else: nothing. handled from drbd_disconnect...
1894         * I don't think we may complete this just yet
1895         * in case we are "on-disconnect: freeze" */
1896
1897        return err;
1898}
1899
1900static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1901{
1902        struct drbd_peer_device *peer_device;
1903        struct drbd_device *device;
1904        sector_t sector;
1905        int err;
1906        struct p_data *p = pi->data;
1907
1908        peer_device = conn_peer_device(connection, pi->vnr);
1909        if (!peer_device)
1910                return -EIO;
1911        device = peer_device->device;
1912
1913        sector = be64_to_cpu(p->sector);
1914        D_ASSERT(device, p->block_id == ID_SYNCER);
1915
1916        if (get_ldev(device)) {
1917                /* data is submitted to disk within recv_resync_read.
1918                 * corresponding put_ldev done below on error,
1919                 * or in drbd_peer_request_endio. */
1920                err = recv_resync_read(peer_device, sector, pi);
1921        } else {
1922                if (__ratelimit(&drbd_ratelimit_state))
1923                        drbd_err(device, "Can not write resync data to local disk.\n");
1924
1925                err = drbd_drain_block(peer_device, pi->size);
1926
1927                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1928        }
1929
1930        atomic_add(pi->size >> 9, &device->rs_sect_in);
1931
1932        return err;
1933}
1934
1935static void restart_conflicting_writes(struct drbd_device *device,
1936                                       sector_t sector, int size)
1937{
1938        struct drbd_interval *i;
1939        struct drbd_request *req;
1940
1941        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1942                if (!i->local)
1943                        continue;
1944                req = container_of(i, struct drbd_request, i);
1945                if (req->rq_state & RQ_LOCAL_PENDING ||
1946                    !(req->rq_state & RQ_POSTPONED))
1947                        continue;
1948                /* as it is RQ_POSTPONED, this will cause it to
1949                 * be queued on the retry workqueue. */
1950                __req_mod(req, CONFLICT_RESOLVED, NULL);
1951        }
1952}
1953
1954/*
1955 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1956 */
1957static int e_end_block(struct drbd_work *w, int cancel)
1958{
1959        struct drbd_peer_request *peer_req =
1960                container_of(w, struct drbd_peer_request, w);
1961        struct drbd_peer_device *peer_device = peer_req->peer_device;
1962        struct drbd_device *device = peer_device->device;
1963        sector_t sector = peer_req->i.sector;
1964        int err = 0, pcmd;
1965
1966        if (peer_req->flags & EE_SEND_WRITE_ACK) {
1967                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1968                        pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1969                                device->state.conn <= C_PAUSED_SYNC_T &&
1970                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1971                                P_RS_WRITE_ACK : P_WRITE_ACK;
1972                        err = drbd_send_ack(peer_device, pcmd, peer_req);
1973                        if (pcmd == P_RS_WRITE_ACK)
1974                                drbd_set_in_sync(device, sector, peer_req->i.size);
1975                } else {
1976                        err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1977                        /* we expect it to be marked out of sync anyways...
1978                         * maybe assert this?  */
1979                }
1980                dec_unacked(device);
1981        }
1982
1983        /* we delete from the conflict detection hash _after_ we sent out the
1984         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1985        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1986                spin_lock_irq(&device->resource->req_lock);
1987                D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1988                drbd_remove_epoch_entry_interval(device, peer_req);
1989                if (peer_req->flags & EE_RESTART_REQUESTS)
1990                        restart_conflicting_writes(device, sector, peer_req->i.size);
1991                spin_unlock_irq(&device->resource->req_lock);
1992        } else
1993                D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1994
1995        drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1996
1997        return err;
1998}
1999
2000static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2001{
2002        struct drbd_peer_request *peer_req =
2003                container_of(w, struct drbd_peer_request, w);
2004        struct drbd_peer_device *peer_device = peer_req->peer_device;
2005        int err;
2006
2007        err = drbd_send_ack(peer_device, ack, peer_req);
2008        dec_unacked(peer_device->device);
2009
2010        return err;
2011}
2012
2013static int e_send_superseded(struct drbd_work *w, int unused)
2014{
2015        return e_send_ack(w, P_SUPERSEDED);
2016}
2017
2018static int e_send_retry_write(struct drbd_work *w, int unused)
2019{
2020        struct drbd_peer_request *peer_req =
2021                container_of(w, struct drbd_peer_request, w);
2022        struct drbd_connection *connection = peer_req->peer_device->connection;
2023
2024        return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2025                             P_RETRY_WRITE : P_SUPERSEDED);
2026}
2027
2028static bool seq_greater(u32 a, u32 b)
2029{
2030        /*
2031         * We assume 32-bit wrap-around here.
2032         * For 24-bit wrap-around, we would have to shift:
2033         *  a <<= 8; b <<= 8;
2034         */
2035        return (s32)a - (s32)b > 0;
2036}
2037
2038static u32 seq_max(u32 a, u32 b)
2039{
2040        return seq_greater(a, b) ? a : b;
2041}
2042
2043static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2044{
2045        struct drbd_device *device = peer_device->device;
2046        unsigned int newest_peer_seq;
2047
2048        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2049                spin_lock(&device->peer_seq_lock);
2050                newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2051                device->peer_seq = newest_peer_seq;
2052                spin_unlock(&device->peer_seq_lock);
2053                /* wake up only if we actually changed device->peer_seq */
2054                if (peer_seq == newest_peer_seq)
2055                        wake_up(&device->seq_wait);
2056        }
2057}
2058
2059static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2060{
2061        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2062}
2063
2064/* maybe change sync_ee into interval trees as well? */
2065static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2066{
2067        struct drbd_peer_request *rs_req;
2068        bool rv = 0;
2069
2070        spin_lock_irq(&device->resource->req_lock);
2071        list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2072                if (overlaps(peer_req->i.sector, peer_req->i.size,
2073                             rs_req->i.sector, rs_req->i.size)) {
2074                        rv = 1;
2075                        break;
2076                }
2077        }
2078        spin_unlock_irq(&device->resource->req_lock);
2079
2080        return rv;
2081}
2082
2083/* Called from receive_Data.
2084 * Synchronize packets on sock with packets on msock.
2085 *
2086 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087 * packet traveling on msock, they are still processed in the order they have
2088 * been sent.
2089 *
2090 * Note: we don't care for Ack packets overtaking P_DATA packets.
2091 *
2092 * In case packet_seq is larger than device->peer_seq number, there are
2093 * outstanding packets on the msock. We wait for them to arrive.
2094 * In case we are the logically next packet, we update device->peer_seq
2095 * ourselves. Correctly handles 32bit wrap around.
2096 *
2097 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2101 *
2102 * returns 0 if we may process the packet,
2103 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2105{
2106        struct drbd_device *device = peer_device->device;
2107        DEFINE_WAIT(wait);
2108        long timeout;
2109        int ret = 0, tp;
2110
2111        if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2112                return 0;
2113
2114        spin_lock(&device->peer_seq_lock);
2115        for (;;) {
2116                if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2117                        device->peer_seq = seq_max(device->peer_seq, peer_seq);
2118                        break;
2119                }
2120
2121                if (signal_pending(current)) {
2122                        ret = -ERESTARTSYS;
2123                        break;
2124                }
2125
2126                rcu_read_lock();
2127                tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2128                rcu_read_unlock();
2129
2130                if (!tp)
2131                        break;
2132
2133                /* Only need to wait if two_primaries is enabled */
2134                prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2135                spin_unlock(&device->peer_seq_lock);
2136                rcu_read_lock();
2137                timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2138                rcu_read_unlock();
2139                timeout = schedule_timeout(timeout);
2140                spin_lock(&device->peer_seq_lock);
2141                if (!timeout) {
2142                        ret = -ETIMEDOUT;
2143                        drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2144                        break;
2145                }
2146        }
2147        spin_unlock(&device->peer_seq_lock);
2148        finish_wait(&device->seq_wait, &wait);
2149        return ret;
2150}
2151
2152/* see also bio_flags_to_wire()
2153 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154 * flags and back. We may replicate to other kernel versions. */
2155static unsigned long wire_flags_to_bio(u32 dpf)
2156{
2157        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2158                (dpf & DP_FUA ? REQ_FUA : 0) |
2159                (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2160                (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2161}
2162
2163static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2164                                    unsigned int size)
2165{
2166        struct drbd_interval *i;
2167
2168    repeat:
2169        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2170                struct drbd_request *req;
2171                struct bio_and_error m;
2172
2173                if (!i->local)
2174                        continue;
2175                req = container_of(i, struct drbd_request, i);
2176                if (!(req->rq_state & RQ_POSTPONED))
2177                        continue;
2178                req->rq_state &= ~RQ_POSTPONED;
2179                __req_mod(req, NEG_ACKED, &m);
2180                spin_unlock_irq(&device->resource->req_lock);
2181                if (m.bio)
2182                        complete_master_bio(device, &m);
2183                spin_lock_irq(&device->resource->req_lock);
2184                goto repeat;
2185        }
2186}
2187
2188static int handle_write_conflicts(struct drbd_device *device,
2189                                  struct drbd_peer_request *peer_req)
2190{
2191        struct drbd_connection *connection = peer_req->peer_device->connection;
2192        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2193        sector_t sector = peer_req->i.sector;
2194        const unsigned int size = peer_req->i.size;
2195        struct drbd_interval *i;
2196        bool equal;
2197        int err;
2198
2199        /*
2200         * Inserting the peer request into the write_requests tree will prevent
2201         * new conflicting local requests from being added.
2202         */
2203        drbd_insert_interval(&device->write_requests, &peer_req->i);
2204
2205    repeat:
2206        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2207                if (i == &peer_req->i)
2208                        continue;
2209                if (i->completed)
2210                        continue;
2211
2212                if (!i->local) {
2213                        /*
2214                         * Our peer has sent a conflicting remote request; this
2215                         * should not happen in a two-node setup.  Wait for the
2216                         * earlier peer request to complete.
2217                         */
2218                        err = drbd_wait_misc(device, i);
2219                        if (err)
2220                                goto out;
2221                        goto repeat;
2222                }
2223
2224                equal = i->sector == sector && i->size == size;
2225                if (resolve_conflicts) {
2226                        /*
2227                         * If the peer request is fully contained within the
2228                         * overlapping request, it can be considered overwritten
2229                         * and thus superseded; otherwise, it will be retried
2230                         * once all overlapping requests have completed.
2231                         */
2232                        bool superseded = i->sector <= sector && i->sector +
2233                                       (i->size >> 9) >= sector + (size >> 9);
2234
2235                        if (!equal)
2236                                drbd_alert(device, "Concurrent writes detected: "
2237                                               "local=%llus +%u, remote=%llus +%u, "
2238                                               "assuming %s came first\n",
2239                                          (unsigned long long)i->sector, i->size,
2240                                          (unsigned long long)sector, size,
2241                                          superseded ? "local" : "remote");
2242
2243                        peer_req->w.cb = superseded ? e_send_superseded :
2244                                                   e_send_retry_write;
2245                        list_add_tail(&peer_req->w.list, &device->done_ee);
2246                        queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2247
2248                        err = -ENOENT;
2249                        goto out;
2250                } else {
2251                        struct drbd_request *req =
2252                                container_of(i, struct drbd_request, i);
2253
2254                        if (!equal)
2255                                drbd_alert(device, "Concurrent writes detected: "
2256                                               "local=%llus +%u, remote=%llus +%u\n",
2257                                          (unsigned long long)i->sector, i->size,
2258                                          (unsigned long long)sector, size);
2259
2260                        if (req->rq_state & RQ_LOCAL_PENDING ||
2261                            !(req->rq_state & RQ_POSTPONED)) {
2262                                /*
2263                                 * Wait for the node with the discard flag to
2264                                 * decide if this request has been superseded
2265                                 * or needs to be retried.
2266                                 * Requests that have been superseded will
2267                                 * disappear from the write_requests tree.
2268                                 *
2269                                 * In addition, wait for the conflicting
2270                                 * request to finish locally before submitting
2271                                 * the conflicting peer request.
2272                                 */
2273                                err = drbd_wait_misc(device, &req->i);
2274                                if (err) {
2275                                        _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2276                                        fail_postponed_requests(device, sector, size);
2277                                        goto out;
2278                                }
2279                                goto repeat;
2280                        }
2281                        /*
2282                         * Remember to restart the conflicting requests after
2283                         * the new peer request has completed.
2284                         */
2285                        peer_req->flags |= EE_RESTART_REQUESTS;
2286                }
2287        }
2288        err = 0;
2289
2290    out:
2291        if (err)
2292                drbd_remove_epoch_entry_interval(device, peer_req);
2293        return err;
2294}
2295
2296/* mirrored write */
2297static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2298{
2299        struct drbd_peer_device *peer_device;
2300        struct drbd_device *device;
2301        struct net_conf *nc;
2302        sector_t sector;
2303        struct drbd_peer_request *peer_req;
2304        struct p_data *p = pi->data;
2305        u32 peer_seq = be32_to_cpu(p->seq_num);
2306        int rw = WRITE;
2307        u32 dp_flags;
2308        int err, tp;
2309
2310        peer_device = conn_peer_device(connection, pi->vnr);
2311        if (!peer_device)
2312                return -EIO;
2313        device = peer_device->device;
2314
2315        if (!get_ldev(device)) {
2316                int err2;
2317
2318                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2319                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2320                atomic_inc(&connection->current_epoch->epoch_size);
2321                err2 = drbd_drain_block(peer_device, pi->size);
2322                if (!err)
2323                        err = err2;
2324                return err;
2325        }
2326
2327        /*
2328         * Corresponding put_ldev done either below (on various errors), or in
2329         * drbd_peer_request_endio, if we successfully submit the data at the
2330         * end of this function.
2331         */
2332
2333        sector = be64_to_cpu(p->sector);
2334        peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2335        if (!peer_req) {
2336                put_ldev(device);
2337                return -EIO;
2338        }
2339
2340        peer_req->w.cb = e_end_block;
2341        peer_req->submit_jif = jiffies;
2342        peer_req->flags |= EE_APPLICATION;
2343
2344        dp_flags = be32_to_cpu(p->dp_flags);
2345        rw |= wire_flags_to_bio(dp_flags);
2346        if (pi->cmd == P_TRIM) {
2347                struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2348                peer_req->flags |= EE_IS_TRIM;
2349                if (!blk_queue_discard(q))
2350                        peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2351                D_ASSERT(peer_device, peer_req->i.size > 0);
2352                D_ASSERT(peer_device, rw & REQ_DISCARD);
2353                D_ASSERT(peer_device, peer_req->pages == NULL);
2354        } else if (peer_req->pages == NULL) {
2355                D_ASSERT(device, peer_req->i.size == 0);
2356                D_ASSERT(device, dp_flags & DP_FLUSH);
2357        }
2358
2359        if (dp_flags & DP_MAY_SET_IN_SYNC)
2360                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2361
2362        spin_lock(&connection->epoch_lock);
2363        peer_req->epoch = connection->current_epoch;
2364        atomic_inc(&peer_req->epoch->epoch_size);
2365        atomic_inc(&peer_req->epoch->active);
2366        spin_unlock(&connection->epoch_lock);
2367
2368        rcu_read_lock();
2369        nc = rcu_dereference(peer_device->connection->net_conf);
2370        tp = nc->two_primaries;
2371        if (peer_device->connection->agreed_pro_version < 100) {
2372                switch (nc->wire_protocol) {
2373                case DRBD_PROT_C:
2374                        dp_flags |= DP_SEND_WRITE_ACK;
2375                        break;
2376                case DRBD_PROT_B:
2377                        dp_flags |= DP_SEND_RECEIVE_ACK;
2378                        break;
2379                }
2380        }
2381        rcu_read_unlock();
2382
2383        if (dp_flags & DP_SEND_WRITE_ACK) {
2384                peer_req->flags |= EE_SEND_WRITE_ACK;
2385                inc_unacked(device);
2386                /* corresponding dec_unacked() in e_end_block()
2387                 * respective _drbd_clear_done_ee */
2388        }
2389
2390        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2391                /* I really don't like it that the receiver thread
2392                 * sends on the msock, but anyways */
2393                drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2394        }
2395
2396        if (tp) {
2397                /* two primaries implies protocol C */
2398                D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2399                peer_req->flags |= EE_IN_INTERVAL_TREE;
2400                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2401                if (err)
2402                        goto out_interrupted;
2403                spin_lock_irq(&device->resource->req_lock);
2404                err = handle_write_conflicts(device, peer_req);
2405                if (err) {
2406                        spin_unlock_irq(&device->resource->req_lock);
2407                        if (err == -ENOENT) {
2408                                put_ldev(device);
2409                                return 0;
2410                        }
2411                        goto out_interrupted;
2412                }
2413        } else {
2414                update_peer_seq(peer_device, peer_seq);
2415                spin_lock_irq(&device->resource->req_lock);
2416        }
2417        /* if we use the zeroout fallback code, we process synchronously
2418         * and we wait for all pending requests, respectively wait for
2419         * active_ee to become empty in drbd_submit_peer_request();
2420         * better not add ourselves here. */
2421        if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2422                list_add_tail(&peer_req->w.list, &device->active_ee);
2423        spin_unlock_irq(&device->resource->req_lock);
2424
2425        if (device->state.conn == C_SYNC_TARGET)
2426                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2427
2428        if (device->state.pdsk < D_INCONSISTENT) {
2429                /* In case we have the only disk of the cluster, */
2430                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2431                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2432                drbd_al_begin_io(device, &peer_req->i);
2433                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2434        }
2435
2436        err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2437        if (!err)
2438                return 0;
2439
2440        /* don't care for the reason here */
2441        drbd_err(device, "submit failed, triggering re-connect\n");
2442        spin_lock_irq(&device->resource->req_lock);
2443        list_del(&peer_req->w.list);
2444        drbd_remove_epoch_entry_interval(device, peer_req);
2445        spin_unlock_irq(&device->resource->req_lock);
2446        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2447                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2448                drbd_al_complete_io(device, &peer_req->i);
2449        }
2450
2451out_interrupted:
2452        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2453        put_ldev(device);
2454        drbd_free_peer_req(device, peer_req);
2455        return err;
2456}
2457
2458/* We may throttle resync, if the lower device seems to be busy,
2459 * and current sync rate is above c_min_rate.
2460 *
2461 * To decide whether or not the lower device is busy, we use a scheme similar
2462 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463 * (more than 64 sectors) of activity we cannot account for with our own resync
2464 * activity, it obviously is "busy".
2465 *
2466 * The current sync rate used here uses only the most recent two step marks,
2467 * to have a short time average so we can react faster.
2468 */
2469bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2470                bool throttle_if_app_is_waiting)
2471{
2472        struct lc_element *tmp;
2473        bool throttle = drbd_rs_c_min_rate_throttle(device);
2474
2475        if (!throttle || throttle_if_app_is_waiting)
2476                return throttle;
2477
2478        spin_lock_irq(&device->al_lock);
2479        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2480        if (tmp) {
2481                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2482                if (test_bit(BME_PRIORITY, &bm_ext->flags))
2483                        throttle = false;
2484                /* Do not slow down if app IO is already waiting for this extent,
2485                 * and our progress is necessary for application IO to complete. */
2486        }
2487        spin_unlock_irq(&device->al_lock);
2488
2489        return throttle;
2490}
2491
2492bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2493{
2494        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2495        unsigned long db, dt, dbdt;
2496        unsigned int c_min_rate;
2497        int curr_events;
2498
2499        rcu_read_lock();
2500        c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2501        rcu_read_unlock();
2502
2503        /* feature disabled? */
2504        if (c_min_rate == 0)
2505                return false;
2506
2507        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2508                      (int)part_stat_read(&disk->part0, sectors[1]) -
2509                        atomic_read(&device->rs_sect_ev);
2510
2511        if (atomic_read(&device->ap_actlog_cnt)
2512            || curr_events - device->rs_last_events > 64) {
2513                unsigned long rs_left;
2514                int i;
2515
2516                device->rs_last_events = curr_events;
2517
2518                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2519                 * approx. */
2520                i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2521
2522                if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2523                        rs_left = device->ov_left;
2524                else
2525                        rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2526
2527                dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2528                if (!dt)
2529                        dt++;
2530                db = device->rs_mark_left[i] - rs_left;
2531                dbdt = Bit2KB(db/dt);
2532
2533                if (dbdt > c_min_rate)
2534                        return true;
2535        }
2536        return false;
2537}
2538
2539static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2540{
2541        struct drbd_peer_device *peer_device;
2542        struct drbd_device *device;
2543        sector_t sector;
2544        sector_t capacity;
2545        struct drbd_peer_request *peer_req;
2546        struct digest_info *di = NULL;
2547        int size, verb;
2548        unsigned int fault_type;
2549        struct p_block_req *p = pi->data;
2550
2551        peer_device = conn_peer_device(connection, pi->vnr);
2552        if (!peer_device)
2553                return -EIO;
2554        device = peer_device->device;
2555        capacity = drbd_get_capacity(device->this_bdev);
2556
2557        sector = be64_to_cpu(p->sector);
2558        size   = be32_to_cpu(p->blksize);
2559
2560        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2561                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2562                                (unsigned long long)sector, size);
2563                return -EINVAL;
2564        }
2565        if (sector + (size>>9) > capacity) {
2566                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2567                                (unsigned long long)sector, size);
2568                return -EINVAL;
2569        }
2570
2571        if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2572                verb = 1;
2573                switch (pi->cmd) {
2574                case P_DATA_REQUEST:
2575                        drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2576                        break;
2577                case P_RS_DATA_REQUEST:
2578                case P_CSUM_RS_REQUEST:
2579                case P_OV_REQUEST:
2580                        drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2581                        break;
2582                case P_OV_REPLY:
2583                        verb = 0;
2584                        dec_rs_pending(device);
2585                        drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2586                        break;
2587                default:
2588                        BUG();
2589                }
2590                if (verb && __ratelimit(&drbd_ratelimit_state))
2591                        drbd_err(device, "Can not satisfy peer's read request, "
2592                            "no local data.\n");
2593
2594                /* drain possibly payload */
2595                return drbd_drain_block(peer_device, pi->size);
2596        }
2597
2598        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599         * "criss-cross" setup, that might cause write-out on some other DRBD,
2600         * which in turn might block on the other node at this very place.  */
2601        peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2602                        true /* has real payload */, GFP_NOIO);
2603        if (!peer_req) {
2604                put_ldev(device);
2605                return -ENOMEM;
2606        }
2607
2608        switch (pi->cmd) {
2609        case P_DATA_REQUEST:
2610                peer_req->w.cb = w_e_end_data_req;
2611                fault_type = DRBD_FAULT_DT_RD;
2612                /* application IO, don't drbd_rs_begin_io */
2613                peer_req->flags |= EE_APPLICATION;
2614                goto submit;
2615
2616        case P_RS_DATA_REQUEST:
2617                peer_req->w.cb = w_e_end_rsdata_req;
2618                fault_type = DRBD_FAULT_RS_RD;
2619                /* used in the sector offset progress display */
2620                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2621                break;
2622
2623        case P_OV_REPLY:
2624        case P_CSUM_RS_REQUEST:
2625                fault_type = DRBD_FAULT_RS_RD;
2626                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2627                if (!di)
2628                        goto out_free_e;
2629
2630                di->digest_size = pi->size;
2631                di->digest = (((char *)di)+sizeof(struct digest_info));
2632
2633                peer_req->digest = di;
2634                peer_req->flags |= EE_HAS_DIGEST;
2635
2636                if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2637                        goto out_free_e;
2638
2639                if (pi->cmd == P_CSUM_RS_REQUEST) {
2640                        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2641                        peer_req->w.cb = w_e_end_csum_rs_req;
2642                        /* used in the sector offset progress display */
2643                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2644                        /* remember to report stats in drbd_resync_finished */
2645                        device->use_csums = true;
2646                } else if (pi->cmd == P_OV_REPLY) {
2647                        /* track progress, we may need to throttle */
2648                        atomic_add(size >> 9, &device->rs_sect_in);
2649                        peer_req->w.cb = w_e_end_ov_reply;
2650                        dec_rs_pending(device);
2651                        /* drbd_rs_begin_io done when we sent this request,
2652                         * but accounting still needs to be done. */
2653                        goto submit_for_resync;
2654                }
2655                break;
2656
2657        case P_OV_REQUEST:
2658                if (device->ov_start_sector == ~(sector_t)0 &&
2659                    peer_device->connection->agreed_pro_version >= 90) {
2660                        unsigned long now = jiffies;
2661                        int i;
2662                        device->ov_start_sector = sector;
2663                        device->ov_position = sector;
2664                        device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2665                        device->rs_total = device->ov_left;
2666                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2667                                device->rs_mark_left[i] = device->ov_left;
2668                                device->rs_mark_time[i] = now;
2669                        }
2670                        drbd_info(device, "Online Verify start sector: %llu\n",
2671                                        (unsigned long long)sector);
2672                }
2673                peer_req->w.cb = w_e_end_ov_req;
2674                fault_type = DRBD_FAULT_RS_RD;
2675                break;
2676
2677        default:
2678                BUG();
2679        }
2680
2681        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682         * wrt the receiver, but it is not as straightforward as it may seem.
2683         * Various places in the resync start and stop logic assume resync
2684         * requests are processed in order, requeuing this on the worker thread
2685         * introduces a bunch of new code for synchronization between threads.
2686         *
2687         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688         * "forever", throttling after drbd_rs_begin_io will lock that extent
2689         * for application writes for the same time.  For now, just throttle
2690         * here, where the rest of the code expects the receiver to sleep for
2691         * a while, anyways.
2692         */
2693
2694        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695         * this defers syncer requests for some time, before letting at least
2696         * on request through.  The resync controller on the receiving side
2697         * will adapt to the incoming rate accordingly.
2698         *
2699         * We cannot throttle here if remote is Primary/SyncTarget:
2700         * we would also throttle its application reads.
2701         * In that case, throttling is done on the SyncTarget only.
2702         */
2703
2704        /* Even though this may be a resync request, we do add to "read_ee";
2705         * "sync_ee" is only used for resync WRITEs.
2706         * Add to list early, so debugfs can find this request
2707         * even if we have to sleep below. */
2708        spin_lock_irq(&device->resource->req_lock);
2709        list_add_tail(&peer_req->w.list, &device->read_ee);
2710        spin_unlock_irq(&device->resource->req_lock);
2711
2712        update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2713        if (device->state.peer != R_PRIMARY
2714        && drbd_rs_should_slow_down(device, sector, false))
2715                schedule_timeout_uninterruptible(HZ/10);
2716        update_receiver_timing_details(connection, drbd_rs_begin_io);
2717        if (drbd_rs_begin_io(device, sector))
2718                goto out_free_e;
2719
2720submit_for_resync:
2721        atomic_add(size >> 9, &device->rs_sect_ev);
2722
2723submit:
2724        update_receiver_timing_details(connection, drbd_submit_peer_request);
2725        inc_unacked(device);
2726        if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2727                return 0;
2728
2729        /* don't care for the reason here */
2730        drbd_err(device, "submit failed, triggering re-connect\n");
2731
2732out_free_e:
2733        spin_lock_irq(&device->resource->req_lock);
2734        list_del(&peer_req->w.list);
2735        spin_unlock_irq(&device->resource->req_lock);
2736        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2737
2738        put_ldev(device);
2739        drbd_free_peer_req(device, peer_req);
2740        return -EIO;
2741}
2742
2743/**
2744 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2745 */
2746static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2747{
2748        struct drbd_device *device = peer_device->device;
2749        int self, peer, rv = -100;
2750        unsigned long ch_self, ch_peer;
2751        enum drbd_after_sb_p after_sb_0p;
2752
2753        self = device->ldev->md.uuid[UI_BITMAP] & 1;
2754        peer = device->p_uuid[UI_BITMAP] & 1;
2755
2756        ch_peer = device->p_uuid[UI_SIZE];
2757        ch_self = device->comm_bm_set;
2758
2759        rcu_read_lock();
2760        after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2761        rcu_read_unlock();
2762        switch (after_sb_0p) {
2763        case ASB_CONSENSUS:
2764        case ASB_DISCARD_SECONDARY:
2765        case ASB_CALL_HELPER:
2766        case ASB_VIOLENTLY:
2767                drbd_err(device, "Configuration error.\n");
2768                break;
2769        case ASB_DISCONNECT:
2770                break;
2771        case ASB_DISCARD_YOUNGER_PRI:
2772                if (self == 0 && peer == 1) {
2773                        rv = -1;
2774                        break;
2775                }
2776                if (self == 1 && peer == 0) {
2777                        rv =  1;
2778                        break;
2779                }
2780                /* Else fall through to one of the other strategies... */
2781        case ASB_DISCARD_OLDER_PRI:
2782                if (self == 0 && peer == 1) {
2783                        rv = 1;
2784                        break;
2785                }
2786                if (self == 1 && peer == 0) {
2787                        rv = -1;
2788                        break;
2789                }
2790                /* Else fall through to one of the other strategies... */
2791                drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2792                     "Using discard-least-changes instead\n");
2793        case ASB_DISCARD_ZERO_CHG:
2794                if (ch_peer == 0 && ch_self == 0) {
2795                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2796                                ? -1 : 1;
2797                        break;
2798                } else {
2799                        if (ch_peer == 0) { rv =  1; break; }
2800                        if (ch_self == 0) { rv = -1; break; }
2801                }
2802                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2803                        break;
2804        case ASB_DISCARD_LEAST_CHG:
2805                if      (ch_self < ch_peer)
2806                        rv = -1;
2807                else if (ch_self > ch_peer)
2808                        rv =  1;
2809                else /* ( ch_self == ch_peer ) */
2810                     /* Well, then use something else. */
2811                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2812                                ? -1 : 1;
2813                break;
2814        case ASB_DISCARD_LOCAL:
2815                rv = -1;
2816                break;
2817        case ASB_DISCARD_REMOTE:
2818                rv =  1;
2819        }
2820
2821        return rv;
2822}
2823
2824/**
2825 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2826 */
2827static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2828{
2829        struct drbd_device *device = peer_device->device;
2830        int hg, rv = -100;
2831        enum drbd_after_sb_p after_sb_1p;
2832
2833        rcu_read_lock();
2834        after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2835        rcu_read_unlock();
2836        switch (after_sb_1p) {
2837        case ASB_DISCARD_YOUNGER_PRI:
2838        case ASB_DISCARD_OLDER_PRI:
2839        case ASB_DISCARD_LEAST_CHG:
2840        case ASB_DISCARD_LOCAL:
2841        case ASB_DISCARD_REMOTE:
2842        case ASB_DISCARD_ZERO_CHG:
2843                drbd_err(device, "Configuration error.\n");
2844                break;
2845        case ASB_DISCONNECT:
2846                break;
2847        case ASB_CONSENSUS:
2848                hg = drbd_asb_recover_0p(peer_device);
2849                if (hg == -1 && device->state.role == R_SECONDARY)
2850                        rv = hg;
2851                if (hg == 1  && device->state.role == R_PRIMARY)
2852                        rv = hg;
2853                break;
2854        case ASB_VIOLENTLY:
2855                rv = drbd_asb_recover_0p(peer_device);
2856                break;
2857        case ASB_DISCARD_SECONDARY:
2858                return device->state.role == R_PRIMARY ? 1 : -1;
2859        case ASB_CALL_HELPER:
2860                hg = drbd_asb_recover_0p(peer_device);
2861                if (hg == -1 && device->state.role == R_PRIMARY) {
2862                        enum drbd_state_rv rv2;
2863
2864                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2866                          * we do not need to wait for the after state change work either. */
2867                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2868                        if (rv2 != SS_SUCCESS) {
2869                                drbd_khelper(device, "pri-lost-after-sb");
2870                        } else {
2871                                drbd_warn(device, "Successfully gave up primary role.\n");
2872                                rv = hg;
2873                        }
2874                } else
2875                        rv = hg;
2876        }
2877
2878        return rv;
2879}
2880
2881/**
2882 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2883 */
2884static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2885{
2886        struct drbd_device *device = peer_device->device;
2887        int hg, rv = -100;
2888        enum drbd_after_sb_p after_sb_2p;
2889
2890        rcu_read_lock();
2891        after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2892        rcu_read_unlock();
2893        switch (after_sb_2p) {
2894        case ASB_DISCARD_YOUNGER_PRI:
2895        case ASB_DISCARD_OLDER_PRI:
2896        case ASB_DISCARD_LEAST_CHG:
2897        case ASB_DISCARD_LOCAL:
2898        case ASB_DISCARD_REMOTE:
2899        case ASB_CONSENSUS:
2900        case ASB_DISCARD_SECONDARY:
2901        case ASB_DISCARD_ZERO_CHG:
2902                drbd_err(device, "Configuration error.\n");
2903                break;
2904        case ASB_VIOLENTLY:
2905                rv = drbd_asb_recover_0p(peer_device);
2906                break;
2907        case ASB_DISCONNECT:
2908                break;
2909        case ASB_CALL_HELPER:
2910                hg = drbd_asb_recover_0p(peer_device);
2911                if (hg == -1) {
2912                        enum drbd_state_rv rv2;
2913
2914                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2916                          * we do not need to wait for the after state change work either. */
2917                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2918                        if (rv2 != SS_SUCCESS) {
2919                                drbd_khelper(device, "pri-lost-after-sb");
2920                        } else {
2921                                drbd_warn(device, "Successfully gave up primary role.\n");
2922                                rv = hg;
2923                        }
2924                } else
2925                        rv = hg;
2926        }
2927
2928        return rv;
2929}
2930
2931static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2932                           u64 bits, u64 flags)
2933{
2934        if (!uuid) {
2935                drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2936                return;
2937        }
2938        drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2939             text,
2940             (unsigned long long)uuid[UI_CURRENT],
2941             (unsigned long long)uuid[UI_BITMAP],
2942             (unsigned long long)uuid[UI_HISTORY_START],
2943             (unsigned long long)uuid[UI_HISTORY_END],
2944             (unsigned long long)bits,
2945             (unsigned long long)flags);
2946}
2947
2948/*
2949  100   after split brain try auto recover
2950    2   C_SYNC_SOURCE set BitMap
2951    1   C_SYNC_SOURCE use BitMap
2952    0   no Sync
2953   -1   C_SYNC_TARGET use BitMap
2954   -2   C_SYNC_TARGET set BitMap
2955 -100   after split brain, disconnect
2956-1000   unrelated data
2957-1091   requires proto 91
2958-1096   requires proto 96
2959 */
2960static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2961{
2962        struct drbd_peer_device *const peer_device = first_peer_device(device);
2963        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2964        u64 self, peer;
2965        int i, j;
2966
2967        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2968        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969
2970        *rule_nr = 10;
2971        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2972                return 0;
2973
2974        *rule_nr = 20;
2975        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2976             peer != UUID_JUST_CREATED)
2977                return -2;
2978
2979        *rule_nr = 30;
2980        if (self != UUID_JUST_CREATED &&
2981            (peer == UUID_JUST_CREATED || peer == (u64)0))
2982                return 2;
2983
2984        if (self == peer) {
2985                int rct, dc; /* roles at crash time */
2986
2987                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2988
2989                        if (connection->agreed_pro_version < 91)
2990                                return -1091;
2991
2992                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2993                            (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2994                                drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995                                drbd_uuid_move_history(device);
2996                                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2997                                device->ldev->md.uuid[UI_BITMAP] = 0;
2998
2999                                drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3000                                               device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001                                *rule_nr = 34;
3002                        } else {
3003                                drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3004                                *rule_nr = 36;
3005                        }
3006
3007                        return 1;
3008                }
3009
3010                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3011
3012                        if (connection->agreed_pro_version < 91)
3013                                return -1091;
3014
3015                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3016                            (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3017                                drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3018
3019                                device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3020                                device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3021                                device->p_uuid[UI_BITMAP] = 0UL;
3022
3023                                drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3024                                *rule_nr = 35;
3025                        } else {
3026                                drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3027                                *rule_nr = 37;
3028                        }
3029
3030                        return -1;
3031                }
3032
3033                /* Common power [off|failure] */
3034                rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3035                        (device->p_uuid[UI_FLAGS] & 2);
3036                /* lowest bit is set when we were primary,
3037                 * next bit (weight 2) is set when peer was primary */
3038                *rule_nr = 40;
3039
3040                switch (rct) {
3041                case 0: /* !self_pri && !peer_pri */ return 0;
3042                case 1: /*  self_pri && !peer_pri */ return 1;
3043                case 2: /* !self_pri &&  peer_pri */ return -1;
3044                case 3: /*  self_pri &&  peer_pri */
3045                        dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3046                        return dc ? -1 : 1;
3047                }
3048        }
3049
3050        *rule_nr = 50;
3051        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3052        if (self == peer)
3053                return -1;
3054
3055        *rule_nr = 51;
3056        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3057        if (self == peer) {
3058                if (connection->agreed_pro_version < 96 ?
3059                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3060                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3061                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3062                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3063                           resync as sync source modifications of the peer's UUIDs. */
3064
3065                        if (connection->agreed_pro_version < 91)
3066                                return -1091;
3067
3068                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3069                        device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3070
3071                        drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3072                        drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3073
3074                        return -1;
3075                }
3076        }
3077
3078        *rule_nr = 60;
3079        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3080        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3081                peer = device->p_uuid[i] & ~((u64)1);
3082                if (self == peer)
3083                        return -2;
3084        }
3085
3086        *rule_nr = 70;
3087        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3088        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3089        if (self == peer)
3090                return 1;
3091
3092        *rule_nr = 71;
3093        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3094        if (self == peer) {
3095                if (connection->agreed_pro_version < 96 ?
3096                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3097                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3098                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3099                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3100                           resync as sync source modifications of our UUIDs. */
3101
3102                        if (connection->agreed_pro_version < 91)
3103                                return -1091;
3104
3105                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3106                        __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3107
3108                        drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3109                        drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3110                                       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3111
3112                        return 1;
3113                }
3114        }
3115
3116
3117        *rule_nr = 80;
3118        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3119        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3120                self = device->ldev->md.uuid[i] & ~((u64)1);
3121                if (self == peer)
3122                        return 2;
3123        }
3124
3125        *rule_nr = 90;
3126        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3127        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3128        if (self == peer && self != ((u64)0))
3129                return 100;
3130
3131        *rule_nr = 100;
3132        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3133                self = device->ldev->md.uuid[i] & ~((u64)1);
3134                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3135                        peer = device->p_uuid[j] & ~((u64)1);
3136                        if (self == peer)
3137                                return -100;
3138                }
3139        }
3140
3141        return -1000;
3142}
3143
3144/* drbd_sync_handshake() returns the new conn state on success, or
3145   CONN_MASK (-1) on failure.
3146 */
3147static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3148                                           enum drbd_role peer_role,
3149                                           enum drbd_disk_state peer_disk) __must_hold(local)
3150{
3151        struct drbd_device *device = peer_device->device;
3152        enum drbd_conns rv = C_MASK;
3153        enum drbd_disk_state mydisk;
3154        struct net_conf *nc;
3155        int hg, rule_nr, rr_conflict, tentative;
3156
3157        mydisk = device->state.disk;
3158        if (mydisk == D_NEGOTIATING)
3159                mydisk = device->new_state_tmp.disk;
3160
3161        drbd_info(device, "drbd_sync_handshake:\n");
3162
3163        spin_lock_irq(&device->ldev->md.uuid_lock);
3164        drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3165        drbd_uuid_dump(device, "peer", device->p_uuid,
3166                       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3167
3168        hg = drbd_uuid_compare(device, &rule_nr);
3169        spin_unlock_irq(&device->ldev->md.uuid_lock);
3170
3171        drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3172
3173        if (hg == -1000) {
3174                drbd_alert(device, "Unrelated data, aborting!\n");
3175                return C_MASK;
3176        }
3177        if (hg < -1000) {
3178                drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3179                return C_MASK;
3180        }
3181
3182        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3183            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3184                int f = (hg == -100) || abs(hg) == 2;
3185                hg = mydisk > D_INCONSISTENT ? 1 : -1;
3186                if (f)
3187                        hg = hg*2;
3188                drbd_info(device, "Becoming sync %s due to disk states.\n",
3189                     hg > 0 ? "source" : "target");
3190        }
3191
3192        if (abs(hg) == 100)
3193                drbd_khelper(device, "initial-split-brain");
3194
3195        rcu_read_lock();
3196        nc = rcu_dereference(peer_device->connection->net_conf);
3197
3198        if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3199                int pcount = (device->state.role == R_PRIMARY)
3200                           + (peer_role == R_PRIMARY);
3201                int forced = (hg == -100);
3202
3203                switch (pcount) {
3204                case 0:
3205                        hg = drbd_asb_recover_0p(peer_device);
3206                        break;
3207                case 1:
3208                        hg = drbd_asb_recover_1p(peer_device);
3209                        break;
3210                case 2:
3211                        hg = drbd_asb_recover_2p(peer_device);
3212                        break;
3213                }
3214                if (abs(hg) < 100) {
3215                        drbd_warn(device, "Split-Brain detected, %d primaries, "
3216                             "automatically solved. Sync from %s node\n",
3217                             pcount, (hg < 0) ? "peer" : "this");
3218                        if (forced) {
3219                                drbd_warn(device, "Doing a full sync, since"
3220                                     " UUIDs where ambiguous.\n");
3221                                hg = hg*2;
3222                        }
3223                }
3224        }
3225
3226        if (hg == -100) {
3227                if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3228                        hg = -1;
3229                if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3230                        hg = 1;
3231
3232                if (abs(hg) < 100)
3233                        drbd_warn(device, "Split-Brain detected, manually solved. "
3234                             "Sync from %s node\n",
3235                             (hg < 0) ? "peer" : "this");
3236        }
3237        rr_conflict = nc->rr_conflict;
3238        tentative = nc->tentative;
3239        rcu_read_unlock();
3240
3241        if (hg == -100) {
3242                /* FIXME this log message is not correct if we end up here
3243                 * after an attempted attach on a diskless node.
3244                 * We just refuse to attach -- well, we drop the "connection"
3245                 * to that disk, in a way... */
3246                drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3247                drbd_khelper(device, "split-brain");
3248                return C_MASK;
3249        }
3250
3251        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3252                drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3253                return C_MASK;
3254        }
3255
3256        if (hg < 0 && /* by intention we do not use mydisk here. */
3257            device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3258                switch (rr_conflict) {
3259                case ASB_CALL_HELPER:
3260                        drbd_khelper(device, "pri-lost");
3261                        /* fall through */
3262                case ASB_DISCONNECT:
3263                        drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3264                        return C_MASK;
3265                case ASB_VIOLENTLY:
3266                        drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3267                             "assumption\n");
3268                }
3269        }
3270
3271        if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3272                if (hg == 0)
3273                        drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3274                else
3275                        drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3276                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3277                                 abs(hg) >= 2 ? "full" : "bit-map based");
3278                return C_MASK;
3279        }
3280
3281        if (abs(hg) >= 2) {
3282                drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283                if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3284                                        BM_LOCKED_SET_ALLOWED))
3285                        return C_MASK;
3286        }
3287
3288        if (hg > 0) { /* become sync source. */
3289                rv = C_WF_BITMAP_S;
3290        } else if (hg < 0) { /* become sync target */
3291                rv = C_WF_BITMAP_T;
3292        } else {
3293                rv = C_CONNECTED;
3294                if (drbd_bm_total_weight(device)) {
3295                        drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3296                             drbd_bm_total_weight(device));
3297                }
3298        }
3299
3300        return rv;
3301}
3302
3303static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3304{
3305        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306        if (peer == ASB_DISCARD_REMOTE)
3307                return ASB_DISCARD_LOCAL;
3308
3309        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310        if (peer == ASB_DISCARD_LOCAL)
3311                return ASB_DISCARD_REMOTE;
3312
3313        /* everything else is valid if they are equal on both sides. */
3314        return peer;
3315}
3316
3317static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3318{
3319        struct p_protocol *p = pi->data;
3320        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3321        int p_proto, p_discard_my_data, p_two_primaries, cf;
3322        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3323        char integrity_alg[SHARED_SECRET_MAX] = "";
3324        struct crypto_ahash *peer_integrity_tfm = NULL;
3325        void *int_dig_in = NULL, *int_dig_vv = NULL;
3326
3327        p_proto         = be32_to_cpu(p->protocol);
3328        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3329        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3330        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3331        p_two_primaries = be32_to_cpu(p->two_primaries);
3332        cf              = be32_to_cpu(p->conn_flags);
3333        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3334
3335        if (connection->agreed_pro_version >= 87) {
3336                int err;
3337
3338                if (pi->size > sizeof(integrity_alg))
3339                        return -EIO;
3340                err = drbd_recv_all(connection, integrity_alg, pi->size);
3341                if (err)
3342                        return err;
3343                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3344        }
3345
3346        if (pi->cmd != P_PROTOCOL_UPDATE) {
3347                clear_bit(CONN_DRY_RUN, &connection->flags);
3348
3349                if (cf & CF_DRY_RUN)
3350                        set_bit(CONN_DRY_RUN, &connection->flags);
3351
3352                rcu_read_lock();
3353                nc = rcu_dereference(connection->net_conf);
3354
3355                if (p_proto != nc->wire_protocol) {
3356                        drbd_err(connection, "incompatible %s settings\n", "protocol");
3357                        goto disconnect_rcu_unlock;
3358                }
3359
3360                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3361                        drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3362                        goto disconnect_rcu_unlock;
3363                }
3364
3365                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3366                        drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3367                        goto disconnect_rcu_unlock;
3368                }
3369
3370                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3371                        drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3372                        goto disconnect_rcu_unlock;
3373                }
3374
3375                if (p_discard_my_data && nc->discard_my_data) {
3376                        drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3377                        goto disconnect_rcu_unlock;
3378                }
3379
3380                if (p_two_primaries != nc->two_primaries) {
3381                        drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3382                        goto disconnect_rcu_unlock;
3383                }
3384
3385                if (strcmp(integrity_alg, nc->integrity_alg)) {
3386                        drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3387                        goto disconnect_rcu_unlock;
3388                }
3389
3390                rcu_read_unlock();
3391        }
3392
3393        if (integrity_alg[0]) {
3394                int hash_size;
3395
3396                /*
3397                 * We can only change the peer data integrity algorithm
3398                 * here.  Changing our own data integrity algorithm
3399                 * requires that we send a P_PROTOCOL_UPDATE packet at
3400                 * the same time; otherwise, the peer has no way to
3401                 * tell between which packets the algorithm should
3402                 * change.
3403                 */
3404
3405                peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3406                if (!peer_integrity_tfm) {
3407                        drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3408                                 integrity_alg);
3409                        goto disconnect;
3410                }
3411
3412                hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3413                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3414                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3415                if (!(int_dig_in && int_dig_vv)) {
3416                        drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3417                        goto disconnect;
3418                }
3419        }
3420
3421        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3422        if (!new_net_conf) {
3423                drbd_err(connection, "Allocation of new net_conf failed\n");
3424                goto disconnect;
3425        }
3426
3427        mutex_lock(&connection->data.mutex);
3428        mutex_lock(&connection->resource->conf_update);
3429        old_net_conf = connection->net_conf;
3430        *new_net_conf = *old_net_conf;
3431
3432        new_net_conf->wire_protocol = p_proto;
3433        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3434        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3435        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3436        new_net_conf->two_primaries = p_two_primaries;
3437
3438        rcu_assign_pointer(connection->net_conf, new_net_conf);
3439        mutex_unlock(&connection->resource->conf_update);
3440        mutex_unlock(&connection->data.mutex);
3441
3442        crypto_free_ahash(connection->peer_integrity_tfm);
3443        kfree(connection->int_dig_in);
3444        kfree(connection->int_dig_vv);
3445        connection->peer_integrity_tfm = peer_integrity_tfm;
3446        connection->int_dig_in = int_dig_in;
3447        connection->int_dig_vv = int_dig_vv;
3448
3449        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3450                drbd_info(connection, "peer data-integrity-alg: %s\n",
3451                          integrity_alg[0] ? integrity_alg : "(none)");
3452
3453        synchronize_rcu();
3454        kfree(old_net_conf);
3455        return 0;
3456
3457disconnect_rcu_unlock:
3458        rcu_read_unlock();
3459disconnect:
3460        crypto_free_ahash(peer_integrity_tfm);
3461        kfree(int_dig_in);
3462        kfree(int_dig_vv);
3463        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3464        return -EIO;
3465}
3466
3467/* helper function
3468 * input: alg name, feature name
3469 * return: NULL (alg name was "")
3470 *         ERR_PTR(error) if something goes wrong
3471 *         or the crypto hash ptr, if it worked out ok. */
3472static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3473                const char *alg, const char *name)
3474{
3475        struct crypto_ahash *tfm;
3476
3477        if (!alg[0])
3478                return NULL;
3479
3480        tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3481        if (IS_ERR(tfm)) {
3482                drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483                        alg, name, PTR_ERR(tfm));
3484                return tfm;
3485        }
3486        return tfm;
3487}
3488
3489static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3490{
3491        void *buffer = connection->data.rbuf;
3492        int size = pi->size;
3493
3494        while (size) {
3495                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3496                s = drbd_recv(connection, buffer, s);
3497                if (s <= 0) {
3498                        if (s < 0)
3499                                return s;
3500                        break;
3501                }
3502                size -= s;
3503        }
3504        if (size)
3505                return -EIO;
3506        return 0;
3507}
3508
3509/*
3510 * config_unknown_volume  -  device configuration command for unknown volume
3511 *
3512 * When a device is added to an existing connection, the node on which the
3513 * device is added first will send configuration commands to its peer but the
3514 * peer will not know about the device yet.  It will warn and ignore these
3515 * commands.  Once the device is added on the second node, the second node will
3516 * send the same device configuration commands, but in the other direction.
3517 *
3518 * (We can also end up here if drbd is misconfigured.)
3519 */
3520static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3521{
3522        drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3523                  cmdname(pi->cmd), pi->vnr);
3524        return ignore_remaining_packet(connection, pi);
3525}
3526
3527static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3528{
3529        struct drbd_peer_device *peer_device;
3530        struct drbd_device *device;
3531        struct p_rs_param_95 *p;
3532        unsigned int header_size, data_size, exp_max_sz;
3533        struct crypto_ahash *verify_tfm = NULL;
3534        struct crypto_ahash *csums_tfm = NULL;
3535        struct net_conf *old_net_conf, *new_net_conf = NULL;
3536        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3537        const int apv = connection->agreed_pro_version;
3538        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3539        int fifo_size = 0;
3540        int err;
3541
3542        peer_device = conn_peer_device(connection, pi->vnr);
3543        if (!peer_device)
3544                return config_unknown_volume(connection, pi);
3545        device = peer_device->device;
3546
3547        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3548                    : apv == 88 ? sizeof(struct p_rs_param)
3549                                        + SHARED_SECRET_MAX
3550                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3551                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3552
3553        if (pi->size > exp_max_sz) {
3554                drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555                    pi->size, exp_max_sz);
3556                return -EIO;
3557        }
3558
3559        if (apv <= 88) {
3560                header_size = sizeof(struct p_rs_param);
3561                data_size = pi->size - header_size;
3562        } else if (apv <= 94) {
3563                header_size = sizeof(struct p_rs_param_89);
3564                data_size = pi->size - header_size;
3565                D_ASSERT(device, data_size == 0);
3566        } else {
3567                header_size = sizeof(struct p_rs_param_95);
3568                data_size = pi->size - header_size;
3569                D_ASSERT(device, data_size == 0);
3570        }
3571
3572        /* initialize verify_alg and csums_alg */
3573        p = pi->data;
3574        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3575
3576        err = drbd_recv_all(peer_device->connection, p, header_size);
3577        if (err)
3578                return err;
3579
3580        mutex_lock(&connection->resource->conf_update);
3581        old_net_conf = peer_device->connection->net_conf;
3582        if (get_ldev(device)) {
3583                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584                if (!new_disk_conf) {
3585                        put_ldev(device);
3586                        mutex_unlock(&connection->resource->conf_update);
3587                        drbd_err(device, "Allocation of new disk_conf failed\n");
3588                        return -ENOMEM;
3589                }
3590
3591                old_disk_conf = device->ldev->disk_conf;
3592                *new_disk_conf = *old_disk_conf;
3593
3594                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3595        }
3596
3597        if (apv >= 88) {
3598                if (apv == 88) {
3599                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3600                                drbd_err(device, "verify-alg of wrong size, "
3601                                        "peer wants %u, accepting only up to %u byte\n",
3602                                        data_size, SHARED_SECRET_MAX);
3603                                err = -EIO;
3604                                goto reconnect;
3605                        }
3606
3607                        err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3608                        if (err)
3609                                goto reconnect;
3610                        /* we expect NUL terminated string */
3611                        /* but just in case someone tries to be evil */
3612                        D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3613                        p->verify_alg[data_size-1] = 0;
3614
3615                } else /* apv >= 89 */ {
3616                        /* we still expect NUL terminated strings */
3617                        /* but just in case someone tries to be evil */
3618                        D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3619                        D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3620                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3621                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3622                }
3623
3624                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3625                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3626                                drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627                                    old_net_conf->verify_alg, p->verify_alg);
3628                                goto disconnect;
3629                        }
3630                        verify_tfm = drbd_crypto_alloc_digest_safe(device,
3631                                        p->verify_alg, "verify-alg");
3632                        if (IS_ERR(verify_tfm)) {
3633                                verify_tfm = NULL;
3634                                goto disconnect;
3635                        }
3636                }
3637
3638                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3639                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3640                                drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641                                    old_net_conf->csums_alg, p->csums_alg);
3642                                goto disconnect;
3643                        }
3644                        csums_tfm = drbd_crypto_alloc_digest_safe(device,
3645                                        p->csums_alg, "csums-alg");
3646                        if (IS_ERR(csums_tfm)) {
3647                                csums_tfm = NULL;
3648                                goto disconnect;
3649                        }
3650                }
3651
3652                if (apv > 94 && new_disk_conf) {
3653                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3654                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3655                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3656                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3657
3658                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3659                        if (fifo_size != device->rs_plan_s->size) {
3660                                new_plan = fifo_alloc(fifo_size);
3661                                if (!new_plan) {
3662                                        drbd_err(device, "kmalloc of fifo_buffer failed");
3663                                        put_ldev(device);
3664                                        goto disconnect;
3665                                }
3666                        }
3667                }
3668
3669                if (verify_tfm || csums_tfm) {
3670                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3671                        if (!new_net_conf) {
3672                                drbd_err(device, "Allocation of new net_conf failed\n");
3673                                goto disconnect;
3674                        }
3675
3676                        *new_net_conf = *old_net_conf;
3677
3678                        if (verify_tfm) {
3679                                strcpy(new_net_conf->verify_alg, p->verify_alg);
3680                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3681                                crypto_free_ahash(peer_device->connection->verify_tfm);
3682                                peer_device->connection->verify_tfm = verify_tfm;
3683                                drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3684                        }
3685                        if (csums_tfm) {
3686                                strcpy(new_net_conf->csums_alg, p->csums_alg);
3687                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3688                                crypto_free_ahash(peer_device->connection->csums_tfm);
3689                                peer_device->connection->csums_tfm = csums_tfm;
3690                                drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3691                        }
3692                        rcu_assign_pointer(connection->net_conf, new_net_conf);
3693                }
3694        }
3695
3696        if (new_disk_conf) {
3697                rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698                put_ldev(device);
3699        }
3700
3701        if (new_plan) {
3702                old_plan = device->rs_plan_s;
3703                rcu_assign_pointer(device->rs_plan_s, new_plan);
3704        }
3705
3706        mutex_unlock(&connection->resource->conf_update);
3707        synchronize_rcu();
3708        if (new_net_conf)
3709                kfree(old_net_conf);
3710        kfree(old_disk_conf);
3711        kfree(old_plan);
3712
3713        return 0;
3714
3715reconnect:
3716        if (new_disk_conf) {
3717                put_ldev(device);
3718                kfree(new_disk_conf);
3719        }
3720        mutex_unlock(&connection->resource->conf_update);
3721        return -EIO;
3722
3723disconnect:
3724        kfree(new_plan);
3725        if (new_disk_conf) {
3726                put_ldev(device);
3727                kfree(new_disk_conf);
3728        }
3729        mutex_unlock(&connection->resource->conf_update);
3730        /* just for completeness: actually not needed,
3731         * as this is not reached if csums_tfm was ok. */
3732        crypto_free_ahash(csums_tfm);
3733        /* but free the verify_tfm again, if csums_tfm did not work out */
3734        crypto_free_ahash(verify_tfm);
3735        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3736        return -EIO;
3737}
3738
3739/* warn if the arguments differ by more than 12.5% */
3740static void warn_if_differ_considerably(struct drbd_device *device,
3741        const char *s, sector_t a, sector_t b)
3742{
3743        sector_t d;
3744        if (a == 0 || b == 0)
3745                return;
3746        d = (a > b) ? (a - b) : (b - a);
3747        if (d > (a>>3) || d > (b>>3))
3748                drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3749                     (unsigned long long)a, (unsigned long long)b);
3750}
3751
3752static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3753{
3754        struct drbd_peer_device *peer_device;
3755        struct drbd_device *device;
3756        struct p_sizes *p = pi->data;
3757        enum determine_dev_size dd = DS_UNCHANGED;
3758        sector_t p_size, p_usize, p_csize, my_usize;
3759        int ldsc = 0; /* local disk size changed */
3760        enum dds_flags ddsf;
3761
3762        peer_device = conn_peer_device(connection, pi->vnr);
3763        if (!peer_device)
3764                return config_unknown_volume(connection, pi);
3765        device = peer_device->device;
3766
3767        p_size = be64_to_cpu(p->d_size);
3768        p_usize = be64_to_cpu(p->u_size);
3769        p_csize = be64_to_cpu(p->c_size);
3770
3771        /* just store the peer's disk size for now.
3772         * we still need to figure out whether we accept that. */
3773        device->p_size = p_size;
3774
3775        if (get_ldev(device)) {
3776                rcu_read_lock();
3777                my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3778                rcu_read_unlock();
3779
3780                warn_if_differ_considerably(device, "lower level device sizes",
3781                           p_size, drbd_get_max_capacity(device->ldev));
3782                warn_if_differ_considerably(device, "user requested size",
3783                                            p_usize, my_usize);
3784
3785                /* if this is the first connect, or an otherwise expected
3786                 * param exchange, choose the minimum */
3787                if (device->state.conn == C_WF_REPORT_PARAMS)
3788                        p_usize = min_not_zero(my_usize, p_usize);
3789
3790                /* Never shrink a device with usable data during connect.
3791                   But allow online shrinking if we are connected. */
3792                if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3793                    drbd_get_capacity(device->this_bdev) &&
3794                    device->state.disk >= D_OUTDATED &&
3795                    device->state.conn < C_CONNECTED) {
3796                        drbd_err(device, "The peer's disk size is too small!\n");
3797                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798                        put_ldev(device);
3799                        return -EIO;
3800                }
3801
3802                if (my_usize != p_usize) {
3803                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3804
3805                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3806                        if (!new_disk_conf) {
3807                                drbd_err(device, "Allocation of new disk_conf failed\n");
3808                                put_ldev(device);
3809                                return -ENOMEM;
3810                        }
3811
3812                        mutex_lock(&connection->resource->conf_update);
3813                        old_disk_conf = device->ldev->disk_conf;
3814                        *new_disk_conf = *old_disk_conf;
3815                        new_disk_conf->disk_size = p_usize;
3816
3817                        rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3818                        mutex_unlock(&connection->resource->conf_update);
3819                        synchronize_rcu();
3820                        kfree(old_disk_conf);
3821
3822                        drbd_info(device, "Peer sets u_size to %lu sectors\n",
3823                                 (unsigned long)my_usize);
3824                }
3825
3826                put_ldev(device);
3827        }
3828
3829        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3830        /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832           drbd_reconsider_max_bio_size(), we can be sure that after
3833           drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3834
3835        ddsf = be16_to_cpu(p->dds_flags);
3836        if (get_ldev(device)) {
3837                drbd_reconsider_max_bio_size(device, device->ldev);
3838                dd = drbd_determine_dev_size(device, ddsf, NULL);
3839                put_ldev(device);
3840                if (dd == DS_ERROR)
3841                        return -EIO;
3842                drbd_md_sync(device);
3843        } else {
3844                /*
3845                 * I am diskless, need to accept the peer's *current* size.
3846                 * I must NOT accept the peers backing disk size,
3847                 * it may have been larger than mine all along...
3848                 *
3849                 * At this point, the peer knows more about my disk, or at
3850                 * least about what we last agreed upon, than myself.
3851                 * So if his c_size is less than his d_size, the most likely
3852                 * reason is that *my* d_size was smaller last time we checked.
3853                 *
3854                 * However, if he sends a zero current size,
3855                 * take his (user-capped or) backing disk size anyways.
3856                 */
3857                drbd_reconsider_max_bio_size(device, NULL);
3858                drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3859        }
3860
3861        if (get_ldev(device)) {
3862                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3863                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3864                        ldsc = 1;
3865                }
3866
3867                put_ldev(device);
3868        }
3869
3870        if (device->state.conn > C_WF_REPORT_PARAMS) {
3871                if (be64_to_cpu(p->c_size) !=
3872                    drbd_get_capacity(device->this_bdev) || ldsc) {
3873                        /* we have different sizes, probably peer
3874                         * needs to know my new size... */
3875                        drbd_send_sizes(peer_device, 0, ddsf);
3876                }
3877                if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3878                    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3879                        if (device->state.pdsk >= D_INCONSISTENT &&
3880                            device->state.disk >= D_INCONSISTENT) {
3881                                if (ddsf & DDSF_NO_RESYNC)
3882                                        drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3883                                else
3884                                        resync_after_online_grow(device);
3885                        } else
3886                                set_bit(RESYNC_AFTER_NEG, &device->flags);
3887                }
3888        }
3889
3890        return 0;
3891}
3892
3893static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3894{
3895        struct drbd_peer_device *peer_device;
3896        struct drbd_device *device;
3897        struct p_uuids *p = pi->data;
3898        u64 *p_uuid;
3899        int i, updated_uuids = 0;
3900
3901        peer_device = conn_peer_device(connection, pi->vnr);
3902        if (!peer_device)
3903                return config_unknown_volume(connection, pi);
3904        device = peer_device->device;
3905
3906        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3907        if (!p_uuid) {
3908                drbd_err(device, "kmalloc of p_uuid failed\n");
3909                return false;
3910        }
3911
3912        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3913                p_uuid[i] = be64_to_cpu(p->uuid[i]);
3914
3915        kfree(device->p_uuid);
3916        device->p_uuid = p_uuid;
3917
3918        if (device->state.conn < C_CONNECTED &&
3919            device->state.disk < D_INCONSISTENT &&
3920            device->state.role == R_PRIMARY &&
3921            (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3922                drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3923                    (unsigned long long)device->ed_uuid);
3924                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3925                return -EIO;
3926        }
3927
3928        if (get_ldev(device)) {
3929                int skip_initial_sync =
3930                        device->state.conn == C_CONNECTED &&
3931                        peer_device->connection->agreed_pro_version >= 90 &&
3932                        device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3933                        (p_uuid[UI_FLAGS] & 8);
3934                if (skip_initial_sync) {
3935                        drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3936                        drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3937                                        "clear_n_write from receive_uuids",
3938                                        BM_LOCKED_TEST_ALLOWED);
3939                        _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3940                        _drbd_uuid_set(device, UI_BITMAP, 0);
3941                        _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3942                                        CS_VERBOSE, NULL);
3943                        drbd_md_sync(device);
3944                        updated_uuids = 1;
3945                }
3946                put_ldev(device);
3947        } else if (device->state.disk < D_INCONSISTENT &&
3948                   device->state.role == R_PRIMARY) {
3949                /* I am a diskless primary, the peer just created a new current UUID
3950                   for me. */
3951                updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3952        }
3953
3954        /* Before we test for the disk state, we should wait until an eventually
3955           ongoing cluster wide state change is finished. That is important if
3956           we are primary and are detaching from our disk. We need to see the
3957           new disk state... */
3958        mutex_lock(device->state_mutex);
3959        mutex_unlock(device->state_mutex);
3960        if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3961                updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3962
3963        if (updated_uuids)
3964                drbd_print_uuids(device, "receiver updated UUIDs to");
3965
3966        return 0;
3967}
3968
3969/**
3970 * convert_state() - Converts the peer's view of the cluster state to our point of view
3971 * @ps:         The state as seen by the peer.
3972 */
3973static union drbd_state convert_state(union drbd_state ps)
3974{
3975        union drbd_state ms;
3976
3977        static enum drbd_conns c_tab[] = {
3978                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3979                [C_CONNECTED] = C_CONNECTED,
3980
3981                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3982                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3983                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3984                [C_VERIFY_S]       = C_VERIFY_T,
3985                [C_MASK]   = C_MASK,
3986        };
3987
3988        ms.i = ps.i;
3989
3990        ms.conn = c_tab[ps.conn];
3991        ms.peer = ps.role;
3992        ms.role = ps.peer;
3993        ms.pdsk = ps.disk;
3994        ms.disk = ps.pdsk;
3995        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3996
3997        return ms;
3998}
3999
4000static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4001{
4002        struct drbd_peer_device *peer_device;
4003        struct drbd_device *device;
4004        struct p_req_state *p = pi->data;
4005        union drbd_state mask, val;
4006        enum drbd_state_rv rv;
4007
4008        peer_device = conn_peer_device(connection, pi->vnr);
4009        if (!peer_device)
4010                return -EIO;
4011        device = peer_device->device;
4012
4013        mask.i = be32_to_cpu(p->mask);
4014        val.i = be32_to_cpu(p->val);
4015
4016        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4017            mutex_is_locked(device->state_mutex)) {
4018                drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4019                return 0;
4020        }
4021
4022        mask = convert_state(mask);
4023        val = convert_state(val);
4024
4025        rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4026        drbd_send_sr_reply(peer_device, rv);
4027
4028        drbd_md_sync(device);
4029
4030        return 0;
4031}
4032
4033static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4034{
4035        struct p_req_state *p = pi->data;
4036        union drbd_state mask, val;
4037        enum drbd_state_rv rv;
4038
4039        mask.i = be32_to_cpu(p->mask);
4040        val.i = be32_to_cpu(p->val);
4041
4042        if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4043            mutex_is_locked(&connection->cstate_mutex)) {
4044                conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4045                return 0;
4046        }
4047
4048        mask = convert_state(mask);
4049        val = convert_state(val);
4050
4051        rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4052        conn_send_sr_reply(connection, rv);
4053
4054        return 0;
4055}
4056
4057static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4058{
4059        struct drbd_peer_device *peer_device;
4060        struct drbd_device *device;
4061        struct p_state *p = pi->data;
4062        union drbd_state os, ns, peer_state;
4063        enum drbd_disk_state real_peer_disk;
4064        enum chg_state_flags cs_flags;
4065        int rv;
4066
4067        peer_device = conn_peer_device(connection, pi->vnr);
4068        if (!peer_device)
4069                return config_unknown_volume(connection, pi);
4070        device = peer_device->device;
4071
4072        peer_state.i = be32_to_cpu(p->state);
4073
4074        real_peer_disk = peer_state.disk;
4075        if (peer_state.disk == D_NEGOTIATING) {
4076                real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4077                drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4078        }
4079
4080        spin_lock_irq(&device->resource->req_lock);
4081 retry:
4082        os = ns = drbd_read_state(device);
4083        spin_unlock_irq(&device->resource->req_lock);
4084
4085        /* If some other part of the code (ack_receiver thread, timeout)
4086         * already decided to close the connection again,
4087         * we must not "re-establish" it here. */
4088        if (os.conn <= C_TEAR_DOWN)
4089                return -ECONNRESET;
4090
4091        /* If this is the "end of sync" confirmation, usually the peer disk
4092         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093         * set) resync started in PausedSyncT, or if the timing of pause-/
4094         * unpause-sync events has been "just right", the peer disk may
4095         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4096         */
4097        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4098            real_peer_disk == D_UP_TO_DATE &&
4099            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4100                /* If we are (becoming) SyncSource, but peer is still in sync
4101                 * preparation, ignore its uptodate-ness to avoid flapping, it
4102                 * will change to inconsistent once the peer reaches active
4103                 * syncing states.
4104                 * It may have changed syncer-paused flags, however, so we
4105                 * cannot ignore this completely. */
4106                if (peer_state.conn > C_CONNECTED &&
4107                    peer_state.conn < C_SYNC_SOURCE)
4108                        real_peer_disk = D_INCONSISTENT;
4109
4110                /* if peer_state changes to connected at the same time,
4111                 * it explicitly notifies us that it finished resync.
4112                 * Maybe we should finish it up, too? */
4113                else if (os.conn >= C_SYNC_SOURCE &&
4114                         peer_state.conn == C_CONNECTED) {
4115                        if (drbd_bm_total_weight(device) <= device->rs_failed)
4116                                drbd_resync_finished(device);
4117                        return 0;
4118                }
4119        }
4120
4121        /* explicit verify finished notification, stop sector reached. */
4122        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4123            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4124                ov_out_of_sync_print(device);
4125                drbd_resync_finished(device);
4126                return 0;
4127        }
4128
4129        /* peer says his disk is inconsistent, while we think it is uptodate,
4130         * and this happens while the peer still thinks we have a sync going on,
4131         * but we think we are already done with the sync.
4132         * We ignore this to avoid flapping pdsk.
4133         * This should not happen, if the peer is a recent version of drbd. */
4134        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4135            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4136                real_peer_disk = D_UP_TO_DATE;
4137
4138        if (ns.conn == C_WF_REPORT_PARAMS)
4139                ns.conn = C_CONNECTED;
4140
4141        if (peer_state.conn == C_AHEAD)
4142                ns.conn = C_BEHIND;
4143
4144        if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4145            get_ldev_if_state(device, D_NEGOTIATING)) {
4146                int cr; /* consider resync */
4147
4148                /* if we established a new connection */
4149                cr  = (os.conn < C_CONNECTED);
4150                /* if we had an established connection
4151                 * and one of the nodes newly attaches a disk */
4152                cr |= (os.conn == C_CONNECTED &&
4153                       (peer_state.disk == D_NEGOTIATING ||
4154                        os.disk == D_NEGOTIATING));
4155                /* if we have both been inconsistent, and the peer has been
4156                 * forced to be UpToDate with --overwrite-data */
4157                cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4158                /* if we had been plain connected, and the admin requested to
4159                 * start a sync by "invalidate" or "invalidate-remote" */
4160                cr |= (os.conn == C_CONNECTED &&
4161                                (peer_state.conn >= C_STARTING_SYNC_S &&
4162                                 peer_state.conn <= C_WF_BITMAP_T));
4163
4164                if (cr)
4165                        ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4166
4167                put_ldev(device);
4168                if (ns.conn == C_MASK) {
4169                        ns.conn = C_CONNECTED;
4170                        if (device->state.disk == D_NEGOTIATING) {
4171                                drbd_force_state(device, NS(disk, D_FAILED));
4172                        } else if (peer_state.disk == D_NEGOTIATING) {
4173                                drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4174                                peer_state.disk = D_DISKLESS;
4175                                real_peer_disk = D_DISKLESS;
4176                        } else {
4177                                if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4178                                        return -EIO;
4179                                D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4180                                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4181                                return -EIO;
4182                        }
4183                }
4184        }
4185
4186        spin_lock_irq(&device->resource->req_lock);
4187        if (os.i != drbd_read_state(device).i)
4188                goto retry;
4189        clear_bit(CONSIDER_RESYNC, &device->flags);
4190        ns.peer = peer_state.role;
4191        ns.pdsk = real_peer_disk;
4192        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4193        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4194                ns.disk = device->new_state_tmp.disk;
4195        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4196        if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4197            test_bit(NEW_CUR_UUID, &device->flags)) {
4198                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199                   for temporal network outages! */
4200                spin_unlock_irq(&device->resource->req_lock);
4201                drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202                tl_clear(peer_device->connection);
4203                drbd_uuid_new_current(device);
4204                clear_bit(NEW_CUR_UUID, &device->flags);
4205                conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4206                return -EIO;
4207        }
4208        rv = _drbd_set_state(device, ns, cs_flags, NULL);
4209        ns = drbd_read_state(device);
4210        spin_unlock_irq(&device->resource->req_lock);
4211
4212        if (rv < SS_SUCCESS) {
4213                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4214                return -EIO;
4215        }
4216
4217        if (os.conn > C_WF_REPORT_PARAMS) {
4218                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4219                    peer_state.disk != D_NEGOTIATING ) {
4220                        /* we want resync, peer has not yet decided to sync... */
4221                        /* Nowadays only used when forcing a node into primary role and
4222                           setting its disk to UpToDate with that */
4223                        drbd_send_uuids(peer_device);
4224                        drbd_send_current_state(peer_device);
4225                }
4226        }
4227
4228        clear_bit(DISCARD_MY_DATA, &device->flags);
4229
4230        drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4231
4232        return 0;
4233}
4234
4235static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4236{
4237        struct drbd_peer_device *peer_device;
4238        struct drbd_device *device;
4239        struct p_rs_uuid *p = pi->data;
4240
4241        peer_device = conn_peer_device(connection, pi->vnr);
4242        if (!peer_device)
4243                return -EIO;
4244        device = peer_device->device;
4245
4246        wait_event(device->misc_wait,
4247                   device->state.conn == C_WF_SYNC_UUID ||
4248                   device->state.conn == C_BEHIND ||
4249                   device->state.conn < C_CONNECTED ||
4250                   device->state.disk < D_NEGOTIATING);
4251
4252        /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4253
4254        /* Here the _drbd_uuid_ functions are right, current should
4255           _not_ be rotated into the history */
4256        if (get_ldev_if_state(device, D_NEGOTIATING)) {
4257                _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4258                _drbd_uuid_set(device, UI_BITMAP, 0UL);
4259
4260                drbd_print_uuids(device, "updated sync uuid");
4261                drbd_start_resync(device, C_SYNC_TARGET);
4262
4263                put_ldev(device);
4264        } else
4265                drbd_err(device, "Ignoring SyncUUID packet!\n");
4266
4267        return 0;
4268}
4269
4270/**
4271 * receive_bitmap_plain
4272 *
4273 * Return 0 when done, 1 when another iteration is needed, and a negative error
4274 * code upon failure.
4275 */
4276static int
4277receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4278                     unsigned long *p, struct bm_xfer_ctx *c)
4279{
4280        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4281                                 drbd_header_size(peer_device->connection);
4282        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4283                                       c->bm_words - c->word_offset);
4284        unsigned int want = num_words * sizeof(*p);
4285        int err;
4286
4287        if (want != size) {
4288                drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4289                return -EIO;
4290        }
4291        if (want == 0)
4292                return 0;
4293        err = drbd_recv_all(peer_device->connection, p, want);
4294        if (err)
4295                return err;
4296
4297        drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4298
4299        c->word_offset += num_words;
4300        c->bit_offset = c->word_offset * BITS_PER_LONG;
4301        if (c->bit_offset > c->bm_bits)
4302                c->bit_offset = c->bm_bits;
4303
4304        return 1;
4305}
4306
4307static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4308{
4309        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4310}
4311
4312static int dcbp_get_start(struct p_compressed_bm *p)
4313{
4314        return (p->encoding & 0x80) != 0;
4315}
4316
4317static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4318{
4319        return (p->encoding >> 4) & 0x7;
4320}
4321
4322/**
4323 * recv_bm_rle_bits
4324 *
4325 * Return 0 when done, 1 when another iteration is needed, and a negative error
4326 * code upon failure.
4327 */
4328static int
4329recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4330                struct p_compressed_bm *p,
4331                 struct bm_xfer_ctx *c,
4332                 unsigned int len)
4333{
4334        struct bitstream bs;
4335        u64 look_ahead;
4336        u64 rl;
4337        u64 tmp;
4338        unsigned long s = c->bit_offset;
4339        unsigned long e;
4340        int toggle = dcbp_get_start(p);
4341        int have;
4342        int bits;
4343
4344        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4345
4346        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4347        if (bits < 0)
4348                return -EIO;
4349
4350        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4351                bits = vli_decode_bits(&rl, look_ahead);
4352                if (bits <= 0)
4353                        return -EIO;
4354
4355                if (toggle) {
4356                        e = s + rl -1;
4357                        if (e >= c->bm_bits) {
4358                                drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4359                                return -EIO;
4360                        }
4361                        _drbd_bm_set_bits(peer_device->device, s, e);
4362                }
4363
4364                if (have < bits) {
4365                        drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366                                have, bits, look_ahead,
4367                                (unsigned int)(bs.cur.b - p->code),
4368                                (unsigned int)bs.buf_len);
4369                        return -EIO;
4370                }
4371                /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372                if (likely(bits < 64))
4373                        look_ahead >>= bits;
4374                else
4375                        look_ahead = 0;
4376                have -= bits;
4377
4378                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4379                if (bits < 0)
4380                        return -EIO;
4381                look_ahead |= tmp << have;
4382                have += bits;
4383        }
4384
4385        c->bit_offset = s;
4386        bm_xfer_ctx_bit_to_word_offset(c);
4387
4388        return (s != c->bm_bits);
4389}
4390
4391/**
4392 * decode_bitmap_c
4393 *
4394 * Return 0 when done, 1 when another iteration is needed, and a negative error
4395 * code upon failure.
4396 */
4397static int
4398decode_bitmap_c(struct drbd_peer_device *peer_device,
4399                struct p_compressed_bm *p,
4400                struct bm_xfer_ctx *c,
4401                unsigned int len)
4402{
4403        if (dcbp_get_code(p) == RLE_VLI_Bits)
4404                return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4405
4406        /* other variants had been implemented for evaluation,
4407         * but have been dropped as this one turned out to be "best"
4408         * during all our tests. */
4409
4410        drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4411        conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412        return -EIO;
4413}
4414
4415void INFO_bm_xfer_stats(struct drbd_device *device,
4416                const char *direction, struct bm_xfer_ctx *c)
4417{
4418        /* what would it take to transfer it "plaintext" */
4419        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4420        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4421        unsigned int plain =
4422                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4423                c->bm_words * sizeof(unsigned long);
4424        unsigned int total = c->bytes[0] + c->bytes[1];
4425        unsigned int r;
4426
4427        /* total can not be zero. but just in case: */
4428        if (total == 0)
4429                return;
4430
4431        /* don't report if not compressed */
4432        if (total >= plain)
4433                return;
4434
4435        /* total < plain. check for overflow, still */
4436        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4437                                    : (1000 * total / plain);
4438
4439        if (r > 1000)
4440                r = 1000;
4441
4442        r = 1000 - r;
4443        drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444             "total %u; compression: %u.%u%%\n",
4445                        direction,
4446                        c->bytes[1], c->packets[1],
4447                        c->bytes[0], c->packets[0],
4448                        total, r/10, r % 10);
4449}
4450
4451/* Since we are processing the bitfield from lower addresses to higher,
4452   it does not matter if the process it in 32 bit chunks or 64 bit
4453   chunks as long as it is little endian. (Understand it as byte stream,
4454   beginning with the lowest byte...) If we would use big endian
4455   we would need to process it from the highest address to the lowest,
4456   in order to be agnostic to the 32 vs 64 bits issue.
4457
4458   returns 0 on failure, 1 if we successfully received it. */
4459static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4460{
4461        struct drbd_peer_device *peer_device;
4462        struct drbd_device *device;
4463        struct bm_xfer_ctx c;
4464        int err;
4465
4466        peer_device = conn_peer_device(connection, pi->vnr);
4467        if (!peer_device)
4468                return -EIO;
4469        device = peer_device->device;
4470
4471        drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4472        /* you are supposed to send additional out-of-sync information
4473         * if you actually set bits during this phase */
4474
4475        c = (struct bm_xfer_ctx) {
4476                .bm_bits = drbd_bm_bits(device),
4477                .bm_words = drbd_bm_words(device),
4478        };
4479
4480        for(;;) {
4481                if (pi->cmd == P_BITMAP)
4482                        err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4483                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4484                        /* MAYBE: sanity check that we speak proto >= 90,
4485                         * and the feature is enabled! */
4486                        struct p_compressed_bm *p = pi->data;
4487
4488                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4489                                drbd_err(device, "ReportCBitmap packet too large\n");
4490                                err = -EIO;
4491                                goto out;
4492                        }
4493                        if (pi->size <= sizeof(*p)) {
4494                                drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4495                                err = -EIO;
4496                                goto out;
4497                        }
4498                        err = drbd_recv_all(peer_device->connection, p, pi->size);
4499                        if (err)
4500                               goto out;
4501                        err = decode_bitmap_c(peer_device, p, &c, pi->size);
4502                } else {
4503                        drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4504                        err = -EIO;
4505                        goto out;
4506                }
4507
4508                c.packets[pi->cmd == P_BITMAP]++;
4509                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4510
4511                if (err <= 0) {
4512                        if (err < 0)
4513                                goto out;
4514                        break;
4515                }
4516                err = drbd_recv_header(peer_device->connection, pi);
4517                if (err)
4518                        goto out;
4519        }
4520
4521        INFO_bm_xfer_stats(device, "receive", &c);
4522
4523        if (device->state.conn == C_WF_BITMAP_T) {
4524                enum drbd_state_rv rv;
4525
4526                err = drbd_send_bitmap(device);
4527                if (err)
4528                        goto out;
4529                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530                rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4531                D_ASSERT(device, rv == SS_SUCCESS);
4532        } else if (device->state.conn != C_WF_BITMAP_S) {
4533                /* admin may have requested C_DISCONNECTING,
4534                 * other threads may have noticed network errors */
4535                drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4536                    drbd_conn_str(device->state.conn));
4537        }
4538        err = 0;
4539
4540 out:
4541        drbd_bm_unlock(device);
4542        if (!err && device->state.conn == C_WF_BITMAP_S)
4543                drbd_start_resync(device, C_SYNC_SOURCE);
4544        return err;
4545}
4546
4547static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4548{
4549        drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4550                 pi->cmd, pi->size);
4551
4552        return ignore_remaining_packet(connection, pi);
4553}
4554
4555static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4556{
4557        /* Make sure we've acked all the TCP data associated
4558         * with the data requests being unplugged */
4559        drbd_tcp_quickack(connection->data.socket);
4560
4561        return 0;
4562}
4563
4564static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4565{
4566        struct drbd_peer_device *peer_device;
4567        struct drbd_device *device;
4568        struct p_block_desc *p = pi->data;
4569
4570        peer_device = conn_peer_device(connection, pi->vnr);
4571        if (!peer_device)
4572                return -EIO;
4573        device = peer_device->device;
4574
4575        switch (device->state.conn) {
4576        case C_WF_SYNC_UUID:
4577        case C_WF_BITMAP_T:
4578        case C_BEHIND:
4579                        break;
4580        default:
4581                drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582                                drbd_conn_str(device->state.conn));
4583        }
4584
4585        drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4586
4587        return 0;
4588}
4589
4590struct data_cmd {
4591        int expect_payload;
4592        size_t pkt_size;
4593        int (*fn)(struct drbd_connection *, struct packet_info *);
4594};
4595
4596static struct data_cmd drbd_cmd_handler[] = {
4597        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4598        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4599        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4600        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4601        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4602        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4603        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4604        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4605        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4606        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4607        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4608        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4609        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4610        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4611        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4612        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4613        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4614        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4615        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4616        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4617        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4618        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4619        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4620        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4621        [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4622};
4623
4624static void drbdd(struct drbd_connection *connection)
4625{
4626        struct packet_info pi;
4627        size_t shs; /* sub header size */
4628        int err;
4629
4630        while (get_t_state(&connection->receiver) == RUNNING) {
4631                struct data_cmd *cmd;
4632
4633                drbd_thread_current_set_cpu(&connection->receiver);
4634                update_receiver_timing_details(connection, drbd_recv_header);
4635                if (drbd_recv_header(connection, &pi))
4636                        goto err_out;
4637
4638                cmd = &drbd_cmd_handler[pi.cmd];
4639                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4640                        drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4641                                 cmdname(pi.cmd), pi.cmd);
4642                        goto err_out;
4643                }
4644
4645                shs = cmd->pkt_size;
4646                if (pi.size > shs && !cmd->expect_payload) {
4647                        drbd_err(connection, "No payload expected %s l:%d\n",
4648                                 cmdname(pi.cmd), pi.size);
4649                        goto err_out;
4650                }
4651
4652                if (shs) {
4653                        update_receiver_timing_details(connection, drbd_recv_all_warn);
4654                        err = drbd_recv_all_warn(connection, pi.data, shs);
4655                        if (err)
4656                                goto err_out;
4657                        pi.size -= shs;
4658                }
4659
4660                update_receiver_timing_details(connection, cmd->fn);
4661                err = cmd->fn(connection, &pi);
4662                if (err) {
4663                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4664                                 cmdname(pi.cmd), err, pi.size);
4665                        goto err_out;
4666                }
4667        }
4668        return;
4669
4670    err_out:
4671        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4672}
4673
4674static void conn_disconnect(struct drbd_connection *connection)
4675{
4676        struct drbd_peer_device *peer_device;
4677        enum drbd_conns oc;
4678        int vnr;
4679
4680        if (connection->cstate == C_STANDALONE)
4681                return;
4682
4683        /* We are about to start the cleanup after connection loss.
4684         * Make sure drbd_make_request knows about that.
4685         * Usually we should be in some network failure state already,
4686         * but just in case we are not, we fix it up here.
4687         */
4688        conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4689
4690        /* ack_receiver does not clean up anything. it must not interfere, either */
4691        drbd_thread_stop(&connection->ack_receiver);
4692        if (connection->ack_sender) {
4693                destroy_workqueue(connection->ack_sender);
4694                connection->ack_sender = NULL;
4695        }
4696        drbd_free_sock(connection);
4697
4698        rcu_read_lock();
4699        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4700                struct drbd_device *device = peer_device->device;
4701                kref_get(&device->kref);
4702                rcu_read_unlock();
4703                drbd_disconnected(peer_device);
4704                kref_put(&device->kref, drbd_destroy_device);
4705                rcu_read_lock();
4706        }
4707        rcu_read_unlock();
4708
4709        if (!list_empty(&connection->current_epoch->list))
4710                drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712        atomic_set(&connection->current_epoch->epoch_size, 0);
4713        connection->send.seen_any_write_yet = false;
4714
4715        drbd_info(connection, "Connection closed\n");
4716
4717        if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4718                conn_try_outdate_peer_async(connection);
4719
4720        spin_lock_irq(&connection->resource->req_lock);
4721        oc = connection->cstate;
4722        if (oc >= C_UNCONNECTED)
4723                _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4724
4725        spin_unlock_irq(&connection->resource->req_lock);
4726
4727        if (oc == C_DISCONNECTING)
4728                conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4729}
4730
4731static int drbd_disconnected(struct drbd_peer_device *peer_device)
4732{
4733        struct drbd_device *device = peer_device->device;
4734        unsigned int i;
4735
4736        /* wait for current activity to cease. */
4737        spin_lock_irq(&device->resource->req_lock);
4738        _drbd_wait_ee_list_empty(device, &device->active_ee);
4739        _drbd_wait_ee_list_empty(device, &device->sync_ee);
4740        _drbd_wait_ee_list_empty(device, &device->read_ee);
4741        spin_unlock_irq(&device->resource->req_lock);
4742
4743        /* We do not have data structures that would allow us to
4744         * get the rs_pending_cnt down to 0 again.
4745         *  * On C_SYNC_TARGET we do not have any data structures describing
4746         *    the pending RSDataRequest's we have sent.
4747         *  * On C_SYNC_SOURCE there is no data structure that tracks
4748         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749         *  And no, it is not the sum of the reference counts in the
4750         *  resync_LRU. The resync_LRU tracks the whole operation including
4751         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4752         *  on the fly. */
4753        drbd_rs_cancel_all(device);
4754        device->rs_total = 0;
4755        device->rs_failed = 0;
4756        atomic_set(&device->rs_pending_cnt, 0);
4757        wake_up(&device->misc_wait);
4758
4759        del_timer_sync(&device->resync_timer);
4760        resync_timer_fn((unsigned long)device);
4761
4762        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763         * w_make_resync_request etc. which may still be on the worker queue
4764         * to be "canceled" */
4765        drbd_flush_workqueue(&peer_device->connection->sender_work);
4766
4767        drbd_finish_peer_reqs(device);
4768
4769        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770           might have issued a work again. The one before drbd_finish_peer_reqs() is
4771           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772        drbd_flush_workqueue(&peer_device->connection->sender_work);
4773
4774        /* need to do it again, drbd_finish_peer_reqs() may have populated it
4775         * again via drbd_try_clear_on_disk_bm(). */
4776        drbd_rs_cancel_all(device);
4777
4778        kfree(device->p_uuid);
4779        device->p_uuid = NULL;
4780
4781        if (!drbd_suspended(device))
4782                tl_clear(peer_device->connection);
4783
4784        drbd_md_sync(device);
4785
4786        /* serialize with bitmap writeout triggered by the state change,
4787         * if any. */
4788        wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4789
4790        /* tcp_close and release of sendpage pages can be deferred.  I don't
4791         * want to use SO_LINGER, because apparently it can be deferred for
4792         * more than 20 seconds (longest time I checked).
4793         *
4794         * Actually we don't care for exactly when the network stack does its
4795         * put_page(), but release our reference on these pages right here.
4796         */
4797        i = drbd_free_peer_reqs(device, &device->net_ee);
4798        if (i)
4799                drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4800        i = atomic_read(&device->pp_in_use_by_net);
4801        if (i)
4802                drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4803        i = atomic_read(&device->pp_in_use);
4804        if (i)
4805                drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4806
4807        D_ASSERT(device, list_empty(&device->read_ee));
4808        D_ASSERT(device, list_empty(&device->active_ee));
4809        D_ASSERT(device, list_empty(&device->sync_ee));
4810        D_ASSERT(device, list_empty(&device->done_ee));
4811
4812        return 0;
4813}
4814
4815/*
4816 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817 * we can agree on is stored in agreed_pro_version.
4818 *
4819 * feature flags and the reserved array should be enough room for future
4820 * enhancements of the handshake protocol, and possible plugins...
4821 *
4822 * for now, they are expected to be zero, but ignored.
4823 */
4824static int drbd_send_features(struct drbd_connection *connection)
4825{
4826        struct drbd_socket *sock;
4827        struct p_connection_features *p;
4828
4829        sock = &connection->data;
4830        p = conn_prepare_command(connection, sock);
4831        if (!p)
4832                return -EIO;
4833        memset(p, 0, sizeof(*p));
4834        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4835        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4836        p->feature_flags = cpu_to_be32(PRO_FEATURES);
4837        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4838}
4839
4840/*
4841 * return values:
4842 *   1 yes, we have a valid connection
4843 *   0 oops, did not work out, please try again
4844 *  -1 peer talks different language,
4845 *     no point in trying again, please go standalone.
4846 */
4847static int drbd_do_features(struct drbd_connection *connection)
4848{
4849        /* ASSERT current == connection->receiver ... */
4850        struct p_connection_features *p;
4851        const int expect = sizeof(struct p_connection_features);
4852        struct packet_info pi;
4853        int err;
4854
4855        err = drbd_send_features(connection);
4856        if (err)
4857                return 0;
4858
4859        err = drbd_recv_header(connection, &pi);
4860        if (err)
4861                return 0;
4862
4863        if (pi.cmd != P_CONNECTION_FEATURES) {
4864                drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865                         cmdname(pi.cmd), pi.cmd);
4866                return -1;
4867        }
4868
4869        if (pi.size != expect) {
4870                drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4871                     expect, pi.size);
4872                return -1;
4873        }
4874
4875        p = pi.data;
4876        err = drbd_recv_all_warn(connection, p, expect);
4877        if (err)
4878                return 0;
4879
4880        p->protocol_min = be32_to_cpu(p->protocol_min);
4881        p->protocol_max = be32_to_cpu(p->protocol_max);
4882        if (p->protocol_max == 0)
4883                p->protocol_max = p->protocol_min;
4884
4885        if (PRO_VERSION_MAX < p->protocol_min ||
4886            PRO_VERSION_MIN > p->protocol_max)
4887                goto incompat;
4888
4889        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4890        connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4891
4892        drbd_info(connection, "Handshake successful: "
4893             "Agreed network protocol version %d\n", connection->agreed_pro_version);
4894
4895        drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4896                  connection->agreed_features & FF_TRIM ? " " : " not ");
4897
4898        return 1;
4899
4900 incompat:
4901        drbd_err(connection, "incompatible DRBD dialects: "
4902            "I support %d-%d, peer supports %d-%d\n",
4903            PRO_VERSION_MIN, PRO_VERSION_MAX,
4904            p->protocol_min, p->protocol_max);
4905        return -1;
4906}
4907
4908#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909static int drbd_do_auth(struct drbd_connection *connection)
4910{
4911        drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912        drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4913        return -1;
4914}
4915#else
4916#define CHALLENGE_LEN 64
4917
4918/* Return value:
4919        1 - auth succeeded,
4920        0 - failed, try again (network error),
4921        -1 - auth failed, don't try again.
4922*/
4923
4924static int drbd_do_auth(struct drbd_connection *connection)
4925{
4926        struct drbd_socket *sock;
4927        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4928        char *response = NULL;
4929        char *right_response = NULL;
4930        char *peers_ch = NULL;
4931        unsigned int key_len;
4932        char secret[SHARED_SECRET_MAX]; /* 64 byte */
4933        unsigned int resp_size;
4934        SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4935        struct packet_info pi;
4936        struct net_conf *nc;
4937        int err, rv;
4938
4939        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4940
4941        rcu_read_lock();
4942        nc = rcu_dereference(connection->net_conf);
4943        key_len = strlen(nc->shared_secret);
4944        memcpy(secret, nc->shared_secret, key_len);
4945        rcu_read_unlock();
4946
4947        desc->tfm = connection->cram_hmac_tfm;
4948        desc->flags = 0;
4949
4950        rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4951        if (rv) {
4952                drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4953                rv = -1;
4954                goto fail;
4955        }
4956
4957        get_random_bytes(my_challenge, CHALLENGE_LEN);
4958
4959        sock = &connection->data;
4960        if (!conn_prepare_command(connection, sock)) {
4961                rv = 0;
4962                goto fail;
4963        }
4964        rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4965                                my_challenge, CHALLENGE_LEN);
4966        if (!rv)
4967                goto fail;
4968
4969        err = drbd_recv_header(connection, &pi);
4970        if (err) {
4971                rv = 0;
4972                goto fail;
4973        }
4974
4975        if (pi.cmd != P_AUTH_CHALLENGE) {
4976                drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4977                         cmdname(pi.cmd), pi.cmd);
4978                rv = 0;
4979                goto fail;
4980        }
4981
4982        if (pi.size > CHALLENGE_LEN * 2) {
4983                drbd_err(connection, "expected AuthChallenge payload too big.\n");
4984                rv = -1;
4985                goto fail;
4986        }
4987
4988        if (pi.size < CHALLENGE_LEN) {
4989                drbd_err(connection, "AuthChallenge payload too small.\n");
4990                rv = -1;
4991                goto fail;
4992        }
4993
4994        peers_ch = kmalloc(pi.size, GFP_NOIO);
4995        if (peers_ch == NULL) {
4996                drbd_err(connection, "kmalloc of peers_ch failed\n");
4997                rv = -1;
4998                goto fail;
4999        }
5000
5001        err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5002        if (err) {
5003                rv = 0;
5004                goto fail;
5005        }
5006
5007        if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5008                drbd_err(connection, "Peer presented the same challenge!\n");
5009                rv = -1;
5010                goto fail;
5011        }
5012
5013        resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5014        response = kmalloc(resp_size, GFP_NOIO);
5015        if (response == NULL) {
5016                drbd_err(connection, "kmalloc of response failed\n");
5017                rv = -1;
5018                goto fail;
5019        }
5020
5021        rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5022        if (rv) {
5023                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5024                rv = -1;
5025                goto fail;
5026        }
5027
5028        if (!conn_prepare_command(connection, sock)) {
5029                rv = 0;
5030                goto fail;
5031        }
5032        rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5033                                response, resp_size);
5034        if (!rv)
5035                goto fail;
5036
5037        err = drbd_recv_header(connection, &pi);
5038        if (err) {
5039                rv = 0;
5040                goto fail;
5041        }
5042
5043        if (pi.cmd != P_AUTH_RESPONSE) {
5044                drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5045                         cmdname(pi.cmd), pi.cmd);
5046                rv = 0;
5047                goto fail;
5048        }
5049
5050        if (pi.size != resp_size) {
5051                drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5052                rv = 0;
5053                goto fail;
5054        }
5055
5056        err = drbd_recv_all_warn(connection, response , resp_size);
5057        if (err) {
5058                rv = 0;
5059                goto fail;
5060        }
5061
5062        right_response = kmalloc(resp_size, GFP_NOIO);
5063        if (right_response == NULL) {
5064                drbd_err(connection, "kmalloc of right_response failed\n");
5065                rv = -1;
5066                goto fail;
5067        }
5068
5069        rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5070                                 right_response);
5071        if (rv) {
5072                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5073                rv = -1;
5074                goto fail;
5075        }
5076
5077        rv = !memcmp(response, right_response, resp_size);
5078
5079        if (rv)
5080                drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5081                     resp_size);
5082        else
5083                rv = -1;
5084
5085 fail:
5086        kfree(peers_ch);
5087        kfree(response);
5088        kfree(right_response);
5089        shash_desc_zero(desc);
5090
5091        return rv;
5092}
5093#endif
5094
5095int drbd_receiver(struct drbd_thread *thi)
5096{
5097        struct drbd_connection *connection = thi->connection;
5098        int h;
5099
5100        drbd_info(connection, "receiver (re)started\n");
5101
5102        do {
5103                h = conn_connect(connection);
5104                if (h == 0) {
5105                        conn_disconnect(connection);
5106                        schedule_timeout_interruptible(HZ);
5107                }
5108                if (h == -1) {
5109                        drbd_warn(connection, "Discarding network configuration.\n");
5110                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5111                }
5112        } while (h == 0);
5113
5114        if (h > 0)
5115                drbdd(connection);
5116
5117        conn_disconnect(connection);
5118
5119        drbd_info(connection, "receiver terminated\n");
5120        return 0;
5121}
5122
5123/* ********* acknowledge sender ******** */
5124
5125static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5126{
5127        struct p_req_state_reply *p = pi->data;
5128        int retcode = be32_to_cpu(p->retcode);
5129
5130        if (retcode >= SS_SUCCESS) {
5131                set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5132        } else {
5133                set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5134                drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5135                         drbd_set_st_err_str(retcode), retcode);
5136        }
5137        wake_up(&connection->ping_wait);
5138
5139        return 0;
5140}
5141
5142static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5143{
5144        struct drbd_peer_device *peer_device;
5145        struct drbd_device *device;
5146        struct p_req_state_reply *p = pi->data;
5147        int retcode = be32_to_cpu(p->retcode);
5148
5149        peer_device = conn_peer_device(connection, pi->vnr);
5150        if (!peer_device)
5151                return -EIO;
5152        device = peer_device->device;
5153
5154        if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5155                D_ASSERT(device, connection->agreed_pro_version < 100);
5156                return got_conn_RqSReply(connection, pi);
5157        }
5158
5159        if (retcode >= SS_SUCCESS) {
5160                set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5161        } else {
5162                set_bit(CL_ST_CHG_FAIL, &device->flags);
5163                drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5164                        drbd_set_st_err_str(retcode), retcode);
5165        }
5166        wake_up(&device->state_wait);
5167
5168        return 0;
5169}
5170
5171static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5172{
5173        return drbd_send_ping_ack(connection);
5174
5175}
5176
5177static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5178{
5179        /* restore idle timeout */
5180        connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5181        if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5182                wake_up(&connection->ping_wait);
5183
5184        return 0;
5185}
5186
5187static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5188{
5189        struct drbd_peer_device *peer_device;
5190        struct drbd_device *device;
5191        struct p_block_ack *p = pi->data;
5192        sector_t sector = be64_to_cpu(p->sector);
5193        int blksize = be32_to_cpu(p->blksize);
5194
5195        peer_device = conn_peer_device(connection, pi->vnr);
5196        if (!peer_device)
5197                return -EIO;
5198        device = peer_device->device;
5199
5200        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5201
5202        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5203
5204        if (get_ldev(device)) {
5205                drbd_rs_complete_io(device, sector);
5206                drbd_set_in_sync(device, sector, blksize);
5207                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5208                device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5209                put_ldev(device);
5210        }
5211        dec_rs_pending(device);
5212        atomic_add(blksize >> 9, &device->rs_sect_in);
5213
5214        return 0;
5215}
5216
5217static int
5218validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5219                              struct rb_root *root, const char *func,
5220                              enum drbd_req_event what, bool missing_ok)
5221{
5222        struct drbd_request *req;
5223        struct bio_and_error m;
5224
5225        spin_lock_irq(&device->resource->req_lock);
5226        req = find_request(device, root, id, sector, missing_ok, func);
5227        if (unlikely(!req)) {
5228                spin_unlock_irq(&device->resource->req_lock);
5229                return -EIO;
5230        }
5231        __req_mod(req, what, &m);
5232        spin_unlock_irq(&device->resource->req_lock);
5233
5234        if (m.bio)
5235                complete_master_bio(device, &m);
5236        return 0;
5237}
5238
5239static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5240{
5241        struct drbd_peer_device *peer_device;
5242        struct drbd_device *device;
5243        struct p_block_ack *p = pi->data;
5244        sector_t sector = be64_to_cpu(p->sector);
5245        int blksize = be32_to_cpu(p->blksize);
5246        enum drbd_req_event what;
5247
5248        peer_device = conn_peer_device(connection, pi->vnr);
5249        if (!peer_device)
5250                return -EIO;
5251        device = peer_device->device;
5252
5253        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5254
5255        if (p->block_id == ID_SYNCER) {
5256                drbd_set_in_sync(device, sector, blksize);
5257                dec_rs_pending(device);
5258                return 0;
5259        }
5260        switch (pi->cmd) {
5261        case P_RS_WRITE_ACK:
5262                what = WRITE_ACKED_BY_PEER_AND_SIS;
5263                break;
5264        case P_WRITE_ACK:
5265                what = WRITE_ACKED_BY_PEER;
5266                break;
5267        case P_RECV_ACK:
5268                what = RECV_ACKED_BY_PEER;
5269                break;
5270        case P_SUPERSEDED:
5271                what = CONFLICT_RESOLVED;
5272                break;
5273        case P_RETRY_WRITE:
5274                what = POSTPONE_WRITE;
5275                break;
5276        default:
5277                BUG();
5278        }
5279
5280        return validate_req_change_req_state(device, p->block_id, sector,
5281                                             &device->write_requests, __func__,
5282                                             what, false);
5283}
5284
5285static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5286{
5287        struct drbd_peer_device *peer_device;
5288        struct drbd_device *device;
5289        struct p_block_ack *p = pi->data;
5290        sector_t sector = be64_to_cpu(p->sector);
5291        int size = be32_to_cpu(p->blksize);
5292        int err;
5293
5294        peer_device = conn_peer_device(connection, pi->vnr);
5295        if (!peer_device)
5296                return -EIO;
5297        device = peer_device->device;
5298
5299        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5300
5301        if (p->block_id == ID_SYNCER) {
5302                dec_rs_pending(device);
5303                drbd_rs_failed_io(device, sector, size);
5304                return 0;
5305        }
5306
5307        err = validate_req_change_req_state(device, p->block_id, sector,
5308                                            &device->write_requests, __func__,
5309                                            NEG_ACKED, true);
5310        if (err) {
5311                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5312                   The master bio might already be completed, therefore the
5313                   request is no longer in the collision hash. */
5314                /* In Protocol B we might already have got a P_RECV_ACK
5315                   but then get a P_NEG_ACK afterwards. */
5316                drbd_set_out_of_sync(device, sector, size);
5317        }
5318        return 0;
5319}
5320
5321static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5322{
5323        struct drbd_peer_device *peer_device;
5324        struct drbd_device *device;
5325        struct p_block_ack *p = pi->data;
5326        sector_t sector = be64_to_cpu(p->sector);
5327
5328        peer_device = conn_peer_device(connection, pi->vnr);
5329        if (!peer_device)
5330                return -EIO;
5331        device = peer_device->device;
5332
5333        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5334
5335        drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5336            (unsigned long long)sector, be32_to_cpu(p->blksize));
5337
5338        return validate_req_change_req_state(device, p->block_id, sector,
5339                                             &device->read_requests, __func__,
5340                                             NEG_ACKED, false);
5341}
5342
5343static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5344{
5345        struct drbd_peer_device *peer_device;
5346        struct drbd_device *device;
5347        sector_t sector;
5348        int size;
5349        struct p_block_ack *p = pi->data;
5350
5351        peer_device = conn_peer_device(connection, pi->vnr);
5352        if (!peer_device)
5353                return -EIO;
5354        device = peer_device->device;
5355
5356        sector = be64_to_cpu(p->sector);
5357        size = be32_to_cpu(p->blksize);
5358
5359        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5360
5361        dec_rs_pending(device);
5362
5363        if (get_ldev_if_state(device, D_FAILED)) {
5364                drbd_rs_complete_io(device, sector);
5365                switch (pi->cmd) {
5366                case P_NEG_RS_DREPLY:
5367                        drbd_rs_failed_io(device, sector, size);
5368                case P_RS_CANCEL:
5369                        break;
5370                default:
5371                        BUG();
5372                }
5373                put_ldev(device);
5374        }
5375
5376        return 0;
5377}
5378
5379static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5380{
5381        struct p_barrier_ack *p = pi->data;
5382        struct drbd_peer_device *peer_device;
5383        int vnr;
5384
5385        tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5386
5387        rcu_read_lock();
5388        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5389                struct drbd_device *device = peer_device->device;
5390
5391                if (device->state.conn == C_AHEAD &&
5392                    atomic_read(&device->ap_in_flight) == 0 &&
5393                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5394                        device->start_resync_timer.expires = jiffies + HZ;
5395                        add_timer(&device->start_resync_timer);
5396                }
5397        }
5398        rcu_read_unlock();
5399
5400        return 0;
5401}
5402
5403static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5404{
5405        struct drbd_peer_device *peer_device;
5406        struct drbd_device *device;
5407        struct p_block_ack *p = pi->data;
5408        struct drbd_device_work *dw;
5409        sector_t sector;
5410        int size;
5411
5412        peer_device = conn_peer_device(connection, pi->vnr);
5413        if (!peer_device)
5414                return -EIO;
5415        device = peer_device->device;
5416
5417        sector = be64_to_cpu(p->sector);
5418        size = be32_to_cpu(p->blksize);
5419
5420        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5421
5422        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5423                drbd_ov_out_of_sync_found(device, sector, size);
5424        else
5425                ov_out_of_sync_print(device);
5426
5427        if (!get_ldev(device))
5428                return 0;
5429
5430        drbd_rs_complete_io(device, sector);
5431        dec_rs_pending(device);
5432
5433        --device->ov_left;
5434
5435        /* let's advance progress step marks only for every other megabyte */
5436        if ((device->ov_left & 0x200) == 0x200)
5437                drbd_advance_rs_marks(device, device->ov_left);
5438
5439        if (device->ov_left == 0) {
5440                dw = kmalloc(sizeof(*dw), GFP_NOIO);
5441                if (dw) {
5442                        dw->w.cb = w_ov_finished;
5443                        dw->device = device;
5444                        drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5445                } else {
5446                        drbd_err(device, "kmalloc(dw) failed.");
5447                        ov_out_of_sync_print(device);
5448                        drbd_resync_finished(device);
5449                }
5450        }
5451        put_ldev(device);
5452        return 0;
5453}
5454
5455static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5456{
5457        return 0;
5458}
5459
5460struct meta_sock_cmd {
5461        size_t pkt_size;
5462        int (*fn)(struct drbd_connection *connection, struct packet_info *);
5463};
5464
5465static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5466{
5467        long t;
5468        struct net_conf *nc;
5469
5470        rcu_read_lock();
5471        nc = rcu_dereference(connection->net_conf);
5472        t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5473        rcu_read_unlock();
5474
5475        t *= HZ;
5476        if (ping_timeout)
5477                t /= 10;
5478
5479        connection->meta.socket->sk->sk_rcvtimeo = t;
5480}
5481
5482static void set_ping_timeout(struct drbd_connection *connection)
5483{
5484        set_rcvtimeo(connection, 1);
5485}
5486
5487static void set_idle_timeout(struct drbd_connection *connection)
5488{
5489        set_rcvtimeo(connection, 0);
5490}
5491
5492static struct meta_sock_cmd ack_receiver_tbl[] = {
5493        [P_PING]            = { 0, got_Ping },
5494        [P_PING_ACK]        = { 0, got_PingAck },
5495        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5496        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5497        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5498        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5499        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5500        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5501        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5502        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5503        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5504        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5505        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5506        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5507        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5508        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5509        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5510};
5511
5512int drbd_ack_receiver(struct drbd_thread *thi)
5513{
5514        struct drbd_connection *connection = thi->connection;
5515        struct meta_sock_cmd *cmd = NULL;
5516        struct packet_info pi;
5517        unsigned long pre_recv_jif;
5518        int rv;
5519        void *buf    = connection->meta.rbuf;
5520        int received = 0;
5521        unsigned int header_size = drbd_header_size(connection);
5522        int expect   = header_size;
5523        bool ping_timeout_active = false;
5524        struct sched_param param = { .sched_priority = 2 };
5525
5526        rv = sched_setscheduler(current, SCHED_RR, &param);
5527        if (rv < 0)
5528                drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5529
5530        while (get_t_state(thi) == RUNNING) {
5531                drbd_thread_current_set_cpu(thi);
5532
5533                conn_reclaim_net_peer_reqs(connection);
5534
5535                if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5536                        if (drbd_send_ping(connection)) {
5537                                drbd_err(connection, "drbd_send_ping has failed\n");
5538                                goto reconnect;
5539                        }
5540                        set_ping_timeout(connection);
5541                        ping_timeout_active = true;
5542                }
5543
5544                pre_recv_jif = jiffies;
5545                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5546
5547                /* Note:
5548                 * -EINTR        (on meta) we got a signal
5549                 * -EAGAIN       (on meta) rcvtimeo expired
5550                 * -ECONNRESET   other side closed the connection
5551                 * -ERESTARTSYS  (on data) we got a signal
5552                 * rv <  0       other than above: unexpected error!
5553                 * rv == expected: full header or command
5554                 * rv <  expected: "woken" by signal during receive
5555                 * rv == 0       : "connection shut down by peer"
5556                 */
5557                if (likely(rv > 0)) {
5558                        received += rv;
5559                        buf      += rv;
5560                } else if (rv == 0) {
5561                        if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5562                                long t;
5563                                rcu_read_lock();
5564                                t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5565                                rcu_read_unlock();
5566
5567                                t = wait_event_timeout(connection->ping_wait,
5568                                                       connection->cstate < C_WF_REPORT_PARAMS,
5569                                                       t);
5570                                if (t)
5571                                        break;
5572                        }
5573                        drbd_err(connection, "meta connection shut down by peer.\n");
5574                        goto reconnect;
5575                } else if (rv == -EAGAIN) {
5576                        /* If the data socket received something meanwhile,
5577                         * that is good enough: peer is still alive. */
5578                        if (time_after(connection->last_received, pre_recv_jif))
5579                                continue;
5580                        if (ping_timeout_active) {
5581                                drbd_err(connection, "PingAck did not arrive in time.\n");
5582                                goto reconnect;
5583                        }
5584                        set_bit(SEND_PING, &connection->flags);
5585                        continue;
5586                } else if (rv == -EINTR) {
5587                        /* maybe drbd_thread_stop(): the while condition will notice.
5588                         * maybe woken for send_ping: we'll send a ping above,
5589                         * and change the rcvtimeo */
5590                        flush_signals(current);
5591                        continue;
5592                } else {
5593                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5594                        goto reconnect;
5595                }
5596
5597                if (received == expect && cmd == NULL) {
5598                        if (decode_header(connection, connection->meta.rbuf, &pi))
5599                                goto reconnect;
5600                        cmd = &ack_receiver_tbl[pi.cmd];
5601                        if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5602                                drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5603                                         cmdname(pi.cmd), pi.cmd);
5604                                goto disconnect;
5605                        }
5606                        expect = header_size + cmd->pkt_size;
5607                        if (pi.size != expect - header_size) {
5608                                drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5609                                        pi.cmd, pi.size);
5610                                goto reconnect;
5611                        }
5612                }
5613                if (received == expect) {
5614                        bool err;
5615
5616                        err = cmd->fn(connection, &pi);
5617                        if (err) {
5618                                drbd_err(connection, "%pf failed\n", cmd->fn);
5619                                goto reconnect;
5620                        }
5621
5622                        connection->last_received = jiffies;
5623
5624                        if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5625                                set_idle_timeout(connection);
5626                                ping_timeout_active = false;
5627                        }
5628
5629                        buf      = connection->meta.rbuf;
5630                        received = 0;
5631                        expect   = header_size;
5632                        cmd      = NULL;
5633                }
5634        }
5635
5636        if (0) {
5637reconnect:
5638                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5639                conn_md_sync(connection);
5640        }
5641        if (0) {
5642disconnect:
5643                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5644        }
5645
5646        drbd_info(connection, "ack_receiver terminated\n");
5647
5648        return 0;
5649}
5650
5651void drbd_send_acks_wf(struct work_struct *ws)
5652{
5653        struct drbd_peer_device *peer_device =
5654                container_of(ws, struct drbd_peer_device, send_acks_work);
5655        struct drbd_connection *connection = peer_device->connection;
5656        struct drbd_device *device = peer_device->device;
5657        struct net_conf *nc;
5658        int tcp_cork, err;
5659
5660        rcu_read_lock();
5661        nc = rcu_dereference(connection->net_conf);
5662        tcp_cork = nc->tcp_cork;
5663        rcu_read_unlock();
5664
5665        if (tcp_cork)
5666                drbd_tcp_cork(connection->meta.socket);
5667
5668        err = drbd_finish_peer_reqs(device);
5669        kref_put(&device->kref, drbd_destroy_device);
5670        /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5671           struct work_struct send_acks_work alive, which is in the peer_device object */
5672
5673        if (err) {
5674                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5675                return;
5676        }
5677
5678        if (tcp_cork)
5679                drbd_tcp_uncork(connection->meta.socket);
5680
5681        return;
5682}
5683