linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <linux/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_protocol.h"
  48#include "drbd_req.h"
  49#include "drbd_vli.h"
  50
  51#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
  52
  53struct packet_info {
  54        enum drbd_packet cmd;
  55        unsigned int size;
  56        unsigned int vnr;
  57        void *data;
  58};
  59
  60enum finish_epoch {
  61        FE_STILL_LIVE,
  62        FE_DESTROYED,
  63        FE_RECYCLED,
  64};
  65
  66static int drbd_do_features(struct drbd_connection *connection);
  67static int drbd_do_auth(struct drbd_connection *connection);
  68static int drbd_disconnected(struct drbd_peer_device *);
  69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
  70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  71static int e_end_block(struct drbd_work *, int);
  72
  73
  74#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  75
  76/*
  77 * some helper functions to deal with single linked page lists,
  78 * page->private being our "next" pointer.
  79 */
  80
  81/* If at least n pages are linked at head, get n pages off.
  82 * Otherwise, don't modify head, and return NULL.
  83 * Locking is the responsibility of the caller.
  84 */
  85static struct page *page_chain_del(struct page **head, int n)
  86{
  87        struct page *page;
  88        struct page *tmp;
  89
  90        BUG_ON(!n);
  91        BUG_ON(!head);
  92
  93        page = *head;
  94
  95        if (!page)
  96                return NULL;
  97
  98        while (page) {
  99                tmp = page_chain_next(page);
 100                if (--n == 0)
 101                        break; /* found sufficient pages */
 102                if (tmp == NULL)
 103                        /* insufficient pages, don't use any of them. */
 104                        return NULL;
 105                page = tmp;
 106        }
 107
 108        /* add end of list marker for the returned list */
 109        set_page_private(page, 0);
 110        /* actual return value, and adjustment of head */
 111        page = *head;
 112        *head = tmp;
 113        return page;
 114}
 115
 116/* may be used outside of locks to find the tail of a (usually short)
 117 * "private" page chain, before adding it back to a global chain head
 118 * with page_chain_add() under a spinlock. */
 119static struct page *page_chain_tail(struct page *page, int *len)
 120{
 121        struct page *tmp;
 122        int i = 1;
 123        while ((tmp = page_chain_next(page)))
 124                ++i, page = tmp;
 125        if (len)
 126                *len = i;
 127        return page;
 128}
 129
 130static int page_chain_free(struct page *page)
 131{
 132        struct page *tmp;
 133        int i = 0;
 134        page_chain_for_each_safe(page, tmp) {
 135                put_page(page);
 136                ++i;
 137        }
 138        return i;
 139}
 140
 141static void page_chain_add(struct page **head,
 142                struct page *chain_first, struct page *chain_last)
 143{
 144#if 1
 145        struct page *tmp;
 146        tmp = page_chain_tail(chain_first, NULL);
 147        BUG_ON(tmp != chain_last);
 148#endif
 149
 150        /* add chain to head */
 151        set_page_private(chain_last, (unsigned long)*head);
 152        *head = chain_first;
 153}
 154
 155static struct page *__drbd_alloc_pages(struct drbd_device *device,
 156                                       unsigned int number)
 157{
 158        struct page *page = NULL;
 159        struct page *tmp = NULL;
 160        unsigned int i = 0;
 161
 162        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 163         * So what. It saves a spin_lock. */
 164        if (drbd_pp_vacant >= number) {
 165                spin_lock(&drbd_pp_lock);
 166                page = page_chain_del(&drbd_pp_pool, number);
 167                if (page)
 168                        drbd_pp_vacant -= number;
 169                spin_unlock(&drbd_pp_lock);
 170                if (page)
 171                        return page;
 172        }
 173
 174        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 175         * "criss-cross" setup, that might cause write-out on some other DRBD,
 176         * which in turn might block on the other node at this very place.  */
 177        for (i = 0; i < number; i++) {
 178                tmp = alloc_page(GFP_TRY);
 179                if (!tmp)
 180                        break;
 181                set_page_private(tmp, (unsigned long)page);
 182                page = tmp;
 183        }
 184
 185        if (i == number)
 186                return page;
 187
 188        /* Not enough pages immediately available this time.
 189         * No need to jump around here, drbd_alloc_pages will retry this
 190         * function "soon". */
 191        if (page) {
 192                tmp = page_chain_tail(page, NULL);
 193                spin_lock(&drbd_pp_lock);
 194                page_chain_add(&drbd_pp_pool, page, tmp);
 195                drbd_pp_vacant += i;
 196                spin_unlock(&drbd_pp_lock);
 197        }
 198        return NULL;
 199}
 200
 201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
 202                                           struct list_head *to_be_freed)
 203{
 204        struct drbd_peer_request *peer_req, *tmp;
 205
 206        /* The EEs are always appended to the end of the list. Since
 207           they are sent in order over the wire, they have to finish
 208           in order. As soon as we see the first not finished we can
 209           stop to examine the list... */
 210
 211        list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
 212                if (drbd_peer_req_has_active_page(peer_req))
 213                        break;
 214                list_move(&peer_req->w.list, to_be_freed);
 215        }
 216}
 217
 218static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
 219{
 220        LIST_HEAD(reclaimed);
 221        struct drbd_peer_request *peer_req, *t;
 222
 223        spin_lock_irq(&device->resource->req_lock);
 224        reclaim_finished_net_peer_reqs(device, &reclaimed);
 225        spin_unlock_irq(&device->resource->req_lock);
 226        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 227                drbd_free_net_peer_req(device, peer_req);
 228}
 229
 230static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
 231{
 232        struct drbd_peer_device *peer_device;
 233        int vnr;
 234
 235        rcu_read_lock();
 236        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
 237                struct drbd_device *device = peer_device->device;
 238                if (!atomic_read(&device->pp_in_use_by_net))
 239                        continue;
 240
 241                kref_get(&device->kref);
 242                rcu_read_unlock();
 243                drbd_reclaim_net_peer_reqs(device);
 244                kref_put(&device->kref, drbd_destroy_device);
 245                rcu_read_lock();
 246        }
 247        rcu_read_unlock();
 248}
 249
 250/**
 251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 252 * @device:     DRBD device.
 253 * @number:     number of pages requested
 254 * @retry:      whether to retry, if not enough pages are available right now
 255 *
 256 * Tries to allocate number pages, first from our own page pool, then from
 257 * the kernel.
 258 * Possibly retry until DRBD frees sufficient pages somewhere else.
 259 *
 260 * If this allocation would exceed the max_buffers setting, we throttle
 261 * allocation (schedule_timeout) to give the system some room to breathe.
 262 *
 263 * We do not use max-buffers as hard limit, because it could lead to
 264 * congestion and further to a distributed deadlock during online-verify or
 265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 266 * resync-rate settings are mis-configured.
 267 *
 268 * Returns a page chain linked via page->private.
 269 */
 270struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
 271                              bool retry)
 272{
 273        struct drbd_device *device = peer_device->device;
 274        struct page *page = NULL;
 275        struct net_conf *nc;
 276        DEFINE_WAIT(wait);
 277        unsigned int mxb;
 278
 279        rcu_read_lock();
 280        nc = rcu_dereference(peer_device->connection->net_conf);
 281        mxb = nc ? nc->max_buffers : 1000000;
 282        rcu_read_unlock();
 283
 284        if (atomic_read(&device->pp_in_use) < mxb)
 285                page = __drbd_alloc_pages(device, number);
 286
 287        /* Try to keep the fast path fast, but occasionally we need
 288         * to reclaim the pages we lended to the network stack. */
 289        if (page && atomic_read(&device->pp_in_use_by_net) > 512)
 290                drbd_reclaim_net_peer_reqs(device);
 291
 292        while (page == NULL) {
 293                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 294
 295                drbd_reclaim_net_peer_reqs(device);
 296
 297                if (atomic_read(&device->pp_in_use) < mxb) {
 298                        page = __drbd_alloc_pages(device, number);
 299                        if (page)
 300                                break;
 301                }
 302
 303                if (!retry)
 304                        break;
 305
 306                if (signal_pending(current)) {
 307                        drbd_warn(device, "drbd_alloc_pages interrupted!\n");
 308                        break;
 309                }
 310
 311                if (schedule_timeout(HZ/10) == 0)
 312                        mxb = UINT_MAX;
 313        }
 314        finish_wait(&drbd_pp_wait, &wait);
 315
 316        if (page)
 317                atomic_add(number, &device->pp_in_use);
 318        return page;
 319}
 320
 321/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
 323 * Either links the page chain back to the global pool,
 324 * or returns all pages to the system. */
 325static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
 326{
 327        atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
 328        int i;
 329
 330        if (page == NULL)
 331                return;
 332
 333        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
 334                i = page_chain_free(page);
 335        else {
 336                struct page *tmp;
 337                tmp = page_chain_tail(page, &i);
 338                spin_lock(&drbd_pp_lock);
 339                page_chain_add(&drbd_pp_pool, page, tmp);
 340                drbd_pp_vacant += i;
 341                spin_unlock(&drbd_pp_lock);
 342        }
 343        i = atomic_sub_return(i, a);
 344        if (i < 0)
 345                drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
 346                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 347        wake_up(&drbd_pp_wait);
 348}
 349
 350/*
 351You need to hold the req_lock:
 352 _drbd_wait_ee_list_empty()
 353
 354You must not have the req_lock:
 355 drbd_free_peer_req()
 356 drbd_alloc_peer_req()
 357 drbd_free_peer_reqs()
 358 drbd_ee_fix_bhs()
 359 drbd_finish_peer_reqs()
 360 drbd_clear_done_ee()
 361 drbd_wait_ee_list_empty()
 362*/
 363
 364/* normal: payload_size == request size (bi_size)
 365 * w_same: payload_size == logical_block_size
 366 * trim: payload_size == 0 */
 367struct drbd_peer_request *
 368drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
 369                    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
 370{
 371        struct drbd_device *device = peer_device->device;
 372        struct drbd_peer_request *peer_req;
 373        struct page *page = NULL;
 374        unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 375
 376        if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
 377                return NULL;
 378
 379        peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 380        if (!peer_req) {
 381                if (!(gfp_mask & __GFP_NOWARN))
 382                        drbd_err(device, "%s: allocation failed\n", __func__);
 383                return NULL;
 384        }
 385
 386        if (nr_pages) {
 387                page = drbd_alloc_pages(peer_device, nr_pages,
 388                                        gfpflags_allow_blocking(gfp_mask));
 389                if (!page)
 390                        goto fail;
 391        }
 392
 393        memset(peer_req, 0, sizeof(*peer_req));
 394        INIT_LIST_HEAD(&peer_req->w.list);
 395        drbd_clear_interval(&peer_req->i);
 396        peer_req->i.size = request_size;
 397        peer_req->i.sector = sector;
 398        peer_req->submit_jif = jiffies;
 399        peer_req->peer_device = peer_device;
 400        peer_req->pages = page;
 401        /*
 402         * The block_id is opaque to the receiver.  It is not endianness
 403         * converted, and sent back to the sender unchanged.
 404         */
 405        peer_req->block_id = id;
 406
 407        return peer_req;
 408
 409 fail:
 410        mempool_free(peer_req, drbd_ee_mempool);
 411        return NULL;
 412}
 413
 414void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
 415                       int is_net)
 416{
 417        might_sleep();
 418        if (peer_req->flags & EE_HAS_DIGEST)
 419                kfree(peer_req->digest);
 420        drbd_free_pages(device, peer_req->pages, is_net);
 421        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
 422        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 423        if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
 424                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 425                drbd_al_complete_io(device, &peer_req->i);
 426        }
 427        mempool_free(peer_req, drbd_ee_mempool);
 428}
 429
 430int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
 431{
 432        LIST_HEAD(work_list);
 433        struct drbd_peer_request *peer_req, *t;
 434        int count = 0;
 435        int is_net = list == &device->net_ee;
 436
 437        spin_lock_irq(&device->resource->req_lock);
 438        list_splice_init(list, &work_list);
 439        spin_unlock_irq(&device->resource->req_lock);
 440
 441        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 442                __drbd_free_peer_req(device, peer_req, is_net);
 443                count++;
 444        }
 445        return count;
 446}
 447
 448/*
 449 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 450 */
 451static int drbd_finish_peer_reqs(struct drbd_device *device)
 452{
 453        LIST_HEAD(work_list);
 454        LIST_HEAD(reclaimed);
 455        struct drbd_peer_request *peer_req, *t;
 456        int err = 0;
 457
 458        spin_lock_irq(&device->resource->req_lock);
 459        reclaim_finished_net_peer_reqs(device, &reclaimed);
 460        list_splice_init(&device->done_ee, &work_list);
 461        spin_unlock_irq(&device->resource->req_lock);
 462
 463        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 464                drbd_free_net_peer_req(device, peer_req);
 465
 466        /* possible callbacks here:
 467         * e_end_block, and e_end_resync_block, e_send_superseded.
 468         * all ignore the last argument.
 469         */
 470        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 471                int err2;
 472
 473                /* list_del not necessary, next/prev members not touched */
 474                err2 = peer_req->w.cb(&peer_req->w, !!err);
 475                if (!err)
 476                        err = err2;
 477                drbd_free_peer_req(device, peer_req);
 478        }
 479        wake_up(&device->ee_wait);
 480
 481        return err;
 482}
 483
 484static void _drbd_wait_ee_list_empty(struct drbd_device *device,
 485                                     struct list_head *head)
 486{
 487        DEFINE_WAIT(wait);
 488
 489        /* avoids spin_lock/unlock
 490         * and calling prepare_to_wait in the fast path */
 491        while (!list_empty(head)) {
 492                prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 493                spin_unlock_irq(&device->resource->req_lock);
 494                io_schedule();
 495                finish_wait(&device->ee_wait, &wait);
 496                spin_lock_irq(&device->resource->req_lock);
 497        }
 498}
 499
 500static void drbd_wait_ee_list_empty(struct drbd_device *device,
 501                                    struct list_head *head)
 502{
 503        spin_lock_irq(&device->resource->req_lock);
 504        _drbd_wait_ee_list_empty(device, head);
 505        spin_unlock_irq(&device->resource->req_lock);
 506}
 507
 508static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 509{
 510        struct kvec iov = {
 511                .iov_base = buf,
 512                .iov_len = size,
 513        };
 514        struct msghdr msg = {
 515                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 516        };
 517        return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
 518}
 519
 520static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
 521{
 522        int rv;
 523
 524        rv = drbd_recv_short(connection->data.socket, buf, size, 0);
 525
 526        if (rv < 0) {
 527                if (rv == -ECONNRESET)
 528                        drbd_info(connection, "sock was reset by peer\n");
 529                else if (rv != -ERESTARTSYS)
 530                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
 531        } else if (rv == 0) {
 532                if (test_bit(DISCONNECT_SENT, &connection->flags)) {
 533                        long t;
 534                        rcu_read_lock();
 535                        t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
 536                        rcu_read_unlock();
 537
 538                        t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
 539
 540                        if (t)
 541                                goto out;
 542                }
 543                drbd_info(connection, "sock was shut down by peer\n");
 544        }
 545
 546        if (rv != size)
 547                conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
 548
 549out:
 550        return rv;
 551}
 552
 553static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
 554{
 555        int err;
 556
 557        err = drbd_recv(connection, buf, size);
 558        if (err != size) {
 559                if (err >= 0)
 560                        err = -EIO;
 561        } else
 562                err = 0;
 563        return err;
 564}
 565
 566static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
 567{
 568        int err;
 569
 570        err = drbd_recv_all(connection, buf, size);
 571        if (err && !signal_pending(current))
 572                drbd_warn(connection, "short read (expected size %d)\n", (int)size);
 573        return err;
 574}
 575
 576/* quoting tcp(7):
 577 *   On individual connections, the socket buffer size must be set prior to the
 578 *   listen(2) or connect(2) calls in order to have it take effect.
 579 * This is our wrapper to do so.
 580 */
 581static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 582                unsigned int rcv)
 583{
 584        /* open coded SO_SNDBUF, SO_RCVBUF */
 585        if (snd) {
 586                sock->sk->sk_sndbuf = snd;
 587                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 588        }
 589        if (rcv) {
 590                sock->sk->sk_rcvbuf = rcv;
 591                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 592        }
 593}
 594
 595static struct socket *drbd_try_connect(struct drbd_connection *connection)
 596{
 597        const char *what;
 598        struct socket *sock;
 599        struct sockaddr_in6 src_in6;
 600        struct sockaddr_in6 peer_in6;
 601        struct net_conf *nc;
 602        int err, peer_addr_len, my_addr_len;
 603        int sndbuf_size, rcvbuf_size, connect_int;
 604        int disconnect_on_error = 1;
 605
 606        rcu_read_lock();
 607        nc = rcu_dereference(connection->net_conf);
 608        if (!nc) {
 609                rcu_read_unlock();
 610                return NULL;
 611        }
 612        sndbuf_size = nc->sndbuf_size;
 613        rcvbuf_size = nc->rcvbuf_size;
 614        connect_int = nc->connect_int;
 615        rcu_read_unlock();
 616
 617        my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
 618        memcpy(&src_in6, &connection->my_addr, my_addr_len);
 619
 620        if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
 621                src_in6.sin6_port = 0;
 622        else
 623                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 624
 625        peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
 626        memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
 627
 628        what = "sock_create_kern";
 629        err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
 630                               SOCK_STREAM, IPPROTO_TCP, &sock);
 631        if (err < 0) {
 632                sock = NULL;
 633                goto out;
 634        }
 635
 636        sock->sk->sk_rcvtimeo =
 637        sock->sk->sk_sndtimeo = connect_int * HZ;
 638        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 639
 640       /* explicitly bind to the configured IP as source IP
 641        *  for the outgoing connections.
 642        *  This is needed for multihomed hosts and to be
 643        *  able to use lo: interfaces for drbd.
 644        * Make sure to use 0 as port number, so linux selects
 645        *  a free one dynamically.
 646        */
 647        what = "bind before connect";
 648        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 649        if (err < 0)
 650                goto out;
 651
 652        /* connect may fail, peer not yet available.
 653         * stay C_WF_CONNECTION, don't go Disconnecting! */
 654        disconnect_on_error = 0;
 655        what = "connect";
 656        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 657
 658out:
 659        if (err < 0) {
 660                if (sock) {
 661                        sock_release(sock);
 662                        sock = NULL;
 663                }
 664                switch (-err) {
 665                        /* timeout, busy, signal pending */
 666                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 667                case EINTR: case ERESTARTSYS:
 668                        /* peer not (yet) available, network problem */
 669                case ECONNREFUSED: case ENETUNREACH:
 670                case EHOSTDOWN:    case EHOSTUNREACH:
 671                        disconnect_on_error = 0;
 672                        break;
 673                default:
 674                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 675                }
 676                if (disconnect_on_error)
 677                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 678        }
 679
 680        return sock;
 681}
 682
 683struct accept_wait_data {
 684        struct drbd_connection *connection;
 685        struct socket *s_listen;
 686        struct completion door_bell;
 687        void (*original_sk_state_change)(struct sock *sk);
 688
 689};
 690
 691static void drbd_incoming_connection(struct sock *sk)
 692{
 693        struct accept_wait_data *ad = sk->sk_user_data;
 694        void (*state_change)(struct sock *sk);
 695
 696        state_change = ad->original_sk_state_change;
 697        if (sk->sk_state == TCP_ESTABLISHED)
 698                complete(&ad->door_bell);
 699        state_change(sk);
 700}
 701
 702static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
 703{
 704        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 705        struct sockaddr_in6 my_addr;
 706        struct socket *s_listen;
 707        struct net_conf *nc;
 708        const char *what;
 709
 710        rcu_read_lock();
 711        nc = rcu_dereference(connection->net_conf);
 712        if (!nc) {
 713                rcu_read_unlock();
 714                return -EIO;
 715        }
 716        sndbuf_size = nc->sndbuf_size;
 717        rcvbuf_size = nc->rcvbuf_size;
 718        rcu_read_unlock();
 719
 720        my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
 721        memcpy(&my_addr, &connection->my_addr, my_addr_len);
 722
 723        what = "sock_create_kern";
 724        err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
 725                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 726        if (err) {
 727                s_listen = NULL;
 728                goto out;
 729        }
 730
 731        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 732        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 733
 734        what = "bind before listen";
 735        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 736        if (err < 0)
 737                goto out;
 738
 739        ad->s_listen = s_listen;
 740        write_lock_bh(&s_listen->sk->sk_callback_lock);
 741        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 742        s_listen->sk->sk_state_change = drbd_incoming_connection;
 743        s_listen->sk->sk_user_data = ad;
 744        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 745
 746        what = "listen";
 747        err = s_listen->ops->listen(s_listen, 5);
 748        if (err < 0)
 749                goto out;
 750
 751        return 0;
 752out:
 753        if (s_listen)
 754                sock_release(s_listen);
 755        if (err < 0) {
 756                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 757                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 758                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 759                }
 760        }
 761
 762        return -EIO;
 763}
 764
 765static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 766{
 767        write_lock_bh(&sk->sk_callback_lock);
 768        sk->sk_state_change = ad->original_sk_state_change;
 769        sk->sk_user_data = NULL;
 770        write_unlock_bh(&sk->sk_callback_lock);
 771}
 772
 773static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
 774{
 775        int timeo, connect_int, err = 0;
 776        struct socket *s_estab = NULL;
 777        struct net_conf *nc;
 778
 779        rcu_read_lock();
 780        nc = rcu_dereference(connection->net_conf);
 781        if (!nc) {
 782                rcu_read_unlock();
 783                return NULL;
 784        }
 785        connect_int = nc->connect_int;
 786        rcu_read_unlock();
 787
 788        timeo = connect_int * HZ;
 789        /* 28.5% random jitter */
 790        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 791
 792        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 793        if (err <= 0)
 794                return NULL;
 795
 796        err = kernel_accept(ad->s_listen, &s_estab, 0);
 797        if (err < 0) {
 798                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 799                        drbd_err(connection, "accept failed, err = %d\n", err);
 800                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 801                }
 802        }
 803
 804        if (s_estab)
 805                unregister_state_change(s_estab->sk, ad);
 806
 807        return s_estab;
 808}
 809
 810static int decode_header(struct drbd_connection *, void *, struct packet_info *);
 811
 812static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
 813                             enum drbd_packet cmd)
 814{
 815        if (!conn_prepare_command(connection, sock))
 816                return -EIO;
 817        return conn_send_command(connection, sock, cmd, 0, NULL, 0);
 818}
 819
 820static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
 821{
 822        unsigned int header_size = drbd_header_size(connection);
 823        struct packet_info pi;
 824        struct net_conf *nc;
 825        int err;
 826
 827        rcu_read_lock();
 828        nc = rcu_dereference(connection->net_conf);
 829        if (!nc) {
 830                rcu_read_unlock();
 831                return -EIO;
 832        }
 833        sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
 834        rcu_read_unlock();
 835
 836        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
 837        if (err != header_size) {
 838                if (err >= 0)
 839                        err = -EIO;
 840                return err;
 841        }
 842        err = decode_header(connection, connection->data.rbuf, &pi);
 843        if (err)
 844                return err;
 845        return pi.cmd;
 846}
 847
 848/**
 849 * drbd_socket_okay() - Free the socket if its connection is not okay
 850 * @sock:       pointer to the pointer to the socket.
 851 */
 852static bool drbd_socket_okay(struct socket **sock)
 853{
 854        int rr;
 855        char tb[4];
 856
 857        if (!*sock)
 858                return false;
 859
 860        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 861
 862        if (rr > 0 || rr == -EAGAIN) {
 863                return true;
 864        } else {
 865                sock_release(*sock);
 866                *sock = NULL;
 867                return false;
 868        }
 869}
 870
 871static bool connection_established(struct drbd_connection *connection,
 872                                   struct socket **sock1,
 873                                   struct socket **sock2)
 874{
 875        struct net_conf *nc;
 876        int timeout;
 877        bool ok;
 878
 879        if (!*sock1 || !*sock2)
 880                return false;
 881
 882        rcu_read_lock();
 883        nc = rcu_dereference(connection->net_conf);
 884        timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
 885        rcu_read_unlock();
 886        schedule_timeout_interruptible(timeout);
 887
 888        ok = drbd_socket_okay(sock1);
 889        ok = drbd_socket_okay(sock2) && ok;
 890
 891        return ok;
 892}
 893
 894/* Gets called if a connection is established, or if a new minor gets created
 895   in a connection */
 896int drbd_connected(struct drbd_peer_device *peer_device)
 897{
 898        struct drbd_device *device = peer_device->device;
 899        int err;
 900
 901        atomic_set(&device->packet_seq, 0);
 902        device->peer_seq = 0;
 903
 904        device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
 905                &peer_device->connection->cstate_mutex :
 906                &device->own_state_mutex;
 907
 908        err = drbd_send_sync_param(peer_device);
 909        if (!err)
 910                err = drbd_send_sizes(peer_device, 0, 0);
 911        if (!err)
 912                err = drbd_send_uuids(peer_device);
 913        if (!err)
 914                err = drbd_send_current_state(peer_device);
 915        clear_bit(USE_DEGR_WFC_T, &device->flags);
 916        clear_bit(RESIZE_PENDING, &device->flags);
 917        atomic_set(&device->ap_in_flight, 0);
 918        mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
 919        return err;
 920}
 921
 922/*
 923 * return values:
 924 *   1 yes, we have a valid connection
 925 *   0 oops, did not work out, please try again
 926 *  -1 peer talks different language,
 927 *     no point in trying again, please go standalone.
 928 *  -2 We do not have a network config...
 929 */
 930static int conn_connect(struct drbd_connection *connection)
 931{
 932        struct drbd_socket sock, msock;
 933        struct drbd_peer_device *peer_device;
 934        struct net_conf *nc;
 935        int vnr, timeout, h;
 936        bool discard_my_data, ok;
 937        enum drbd_state_rv rv;
 938        struct accept_wait_data ad = {
 939                .connection = connection,
 940                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 941        };
 942
 943        clear_bit(DISCONNECT_SENT, &connection->flags);
 944        if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 945                return -2;
 946
 947        mutex_init(&sock.mutex);
 948        sock.sbuf = connection->data.sbuf;
 949        sock.rbuf = connection->data.rbuf;
 950        sock.socket = NULL;
 951        mutex_init(&msock.mutex);
 952        msock.sbuf = connection->meta.sbuf;
 953        msock.rbuf = connection->meta.rbuf;
 954        msock.socket = NULL;
 955
 956        /* Assume that the peer only understands protocol 80 until we know better.  */
 957        connection->agreed_pro_version = 80;
 958
 959        if (prepare_listen_socket(connection, &ad))
 960                return 0;
 961
 962        do {
 963                struct socket *s;
 964
 965                s = drbd_try_connect(connection);
 966                if (s) {
 967                        if (!sock.socket) {
 968                                sock.socket = s;
 969                                send_first_packet(connection, &sock, P_INITIAL_DATA);
 970                        } else if (!msock.socket) {
 971                                clear_bit(RESOLVE_CONFLICTS, &connection->flags);
 972                                msock.socket = s;
 973                                send_first_packet(connection, &msock, P_INITIAL_META);
 974                        } else {
 975                                drbd_err(connection, "Logic error in conn_connect()\n");
 976                                goto out_release_sockets;
 977                        }
 978                }
 979
 980                if (connection_established(connection, &sock.socket, &msock.socket))
 981                        break;
 982
 983retry:
 984                s = drbd_wait_for_connect(connection, &ad);
 985                if (s) {
 986                        int fp = receive_first_packet(connection, s);
 987                        drbd_socket_okay(&sock.socket);
 988                        drbd_socket_okay(&msock.socket);
 989                        switch (fp) {
 990                        case P_INITIAL_DATA:
 991                                if (sock.socket) {
 992                                        drbd_warn(connection, "initial packet S crossed\n");
 993                                        sock_release(sock.socket);
 994                                        sock.socket = s;
 995                                        goto randomize;
 996                                }
 997                                sock.socket = s;
 998                                break;
 999                        case P_INITIAL_META:
1000                                set_bit(RESOLVE_CONFLICTS, &connection->flags);
1001                                if (msock.socket) {
1002                                        drbd_warn(connection, "initial packet M crossed\n");
1003                                        sock_release(msock.socket);
1004                                        msock.socket = s;
1005                                        goto randomize;
1006                                }
1007                                msock.socket = s;
1008                                break;
1009                        default:
1010                                drbd_warn(connection, "Error receiving initial packet\n");
1011                                sock_release(s);
1012randomize:
1013                                if (prandom_u32() & 1)
1014                                        goto retry;
1015                        }
1016                }
1017
1018                if (connection->cstate <= C_DISCONNECTING)
1019                        goto out_release_sockets;
1020                if (signal_pending(current)) {
1021                        flush_signals(current);
1022                        smp_rmb();
1023                        if (get_t_state(&connection->receiver) == EXITING)
1024                                goto out_release_sockets;
1025                }
1026
1027                ok = connection_established(connection, &sock.socket, &msock.socket);
1028        } while (!ok);
1029
1030        if (ad.s_listen)
1031                sock_release(ad.s_listen);
1032
1033        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1034        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1035
1036        sock.socket->sk->sk_allocation = GFP_NOIO;
1037        msock.socket->sk->sk_allocation = GFP_NOIO;
1038
1039        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1040        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1041
1042        /* NOT YET ...
1043         * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1044         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1045         * first set it to the P_CONNECTION_FEATURES timeout,
1046         * which we set to 4x the configured ping_timeout. */
1047        rcu_read_lock();
1048        nc = rcu_dereference(connection->net_conf);
1049
1050        sock.socket->sk->sk_sndtimeo =
1051        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1052
1053        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1054        timeout = nc->timeout * HZ / 10;
1055        discard_my_data = nc->discard_my_data;
1056        rcu_read_unlock();
1057
1058        msock.socket->sk->sk_sndtimeo = timeout;
1059
1060        /* we don't want delays.
1061         * we use TCP_CORK where appropriate, though */
1062        drbd_tcp_nodelay(sock.socket);
1063        drbd_tcp_nodelay(msock.socket);
1064
1065        connection->data.socket = sock.socket;
1066        connection->meta.socket = msock.socket;
1067        connection->last_received = jiffies;
1068
1069        h = drbd_do_features(connection);
1070        if (h <= 0)
1071                return h;
1072
1073        if (connection->cram_hmac_tfm) {
1074                /* drbd_request_state(device, NS(conn, WFAuth)); */
1075                switch (drbd_do_auth(connection)) {
1076                case -1:
1077                        drbd_err(connection, "Authentication of peer failed\n");
1078                        return -1;
1079                case 0:
1080                        drbd_err(connection, "Authentication of peer failed, trying again.\n");
1081                        return 0;
1082                }
1083        }
1084
1085        connection->data.socket->sk->sk_sndtimeo = timeout;
1086        connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1087
1088        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1089                return -1;
1090
1091        /* Prevent a race between resync-handshake and
1092         * being promoted to Primary.
1093         *
1094         * Grab and release the state mutex, so we know that any current
1095         * drbd_set_role() is finished, and any incoming drbd_set_role
1096         * will see the STATE_SENT flag, and wait for it to be cleared.
1097         */
1098        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1099                mutex_lock(peer_device->device->state_mutex);
1100
1101        set_bit(STATE_SENT, &connection->flags);
1102
1103        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1104                mutex_unlock(peer_device->device->state_mutex);
1105
1106        rcu_read_lock();
1107        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1108                struct drbd_device *device = peer_device->device;
1109                kref_get(&device->kref);
1110                rcu_read_unlock();
1111
1112                if (discard_my_data)
1113                        set_bit(DISCARD_MY_DATA, &device->flags);
1114                else
1115                        clear_bit(DISCARD_MY_DATA, &device->flags);
1116
1117                drbd_connected(peer_device);
1118                kref_put(&device->kref, drbd_destroy_device);
1119                rcu_read_lock();
1120        }
1121        rcu_read_unlock();
1122
1123        rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1124        if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1125                clear_bit(STATE_SENT, &connection->flags);
1126                return 0;
1127        }
1128
1129        drbd_thread_start(&connection->ack_receiver);
1130        /* opencoded create_singlethread_workqueue(),
1131         * to be able to use format string arguments */
1132        connection->ack_sender =
1133                alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1134        if (!connection->ack_sender) {
1135                drbd_err(connection, "Failed to create workqueue ack_sender\n");
1136                return 0;
1137        }
1138
1139        mutex_lock(&connection->resource->conf_update);
1140        /* The discard_my_data flag is a single-shot modifier to the next
1141         * connection attempt, the handshake of which is now well underway.
1142         * No need for rcu style copying of the whole struct
1143         * just to clear a single value. */
1144        connection->net_conf->discard_my_data = 0;
1145        mutex_unlock(&connection->resource->conf_update);
1146
1147        return h;
1148
1149out_release_sockets:
1150        if (ad.s_listen)
1151                sock_release(ad.s_listen);
1152        if (sock.socket)
1153                sock_release(sock.socket);
1154        if (msock.socket)
1155                sock_release(msock.socket);
1156        return -1;
1157}
1158
1159static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1160{
1161        unsigned int header_size = drbd_header_size(connection);
1162
1163        if (header_size == sizeof(struct p_header100) &&
1164            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1165                struct p_header100 *h = header;
1166                if (h->pad != 0) {
1167                        drbd_err(connection, "Header padding is not zero\n");
1168                        return -EINVAL;
1169                }
1170                pi->vnr = be16_to_cpu(h->volume);
1171                pi->cmd = be16_to_cpu(h->command);
1172                pi->size = be32_to_cpu(h->length);
1173        } else if (header_size == sizeof(struct p_header95) &&
1174                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1175                struct p_header95 *h = header;
1176                pi->cmd = be16_to_cpu(h->command);
1177                pi->size = be32_to_cpu(h->length);
1178                pi->vnr = 0;
1179        } else if (header_size == sizeof(struct p_header80) &&
1180                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1181                struct p_header80 *h = header;
1182                pi->cmd = be16_to_cpu(h->command);
1183                pi->size = be16_to_cpu(h->length);
1184                pi->vnr = 0;
1185        } else {
1186                drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1187                         be32_to_cpu(*(__be32 *)header),
1188                         connection->agreed_pro_version);
1189                return -EINVAL;
1190        }
1191        pi->data = header + header_size;
1192        return 0;
1193}
1194
1195static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1196{
1197        void *buffer = connection->data.rbuf;
1198        int err;
1199
1200        err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1201        if (err)
1202                return err;
1203
1204        err = decode_header(connection, buffer, pi);
1205        connection->last_received = jiffies;
1206
1207        return err;
1208}
1209
1210/* This is blkdev_issue_flush, but asynchronous.
1211 * We want to submit to all component volumes in parallel,
1212 * then wait for all completions.
1213 */
1214struct issue_flush_context {
1215        atomic_t pending;
1216        int error;
1217        struct completion done;
1218};
1219struct one_flush_context {
1220        struct drbd_device *device;
1221        struct issue_flush_context *ctx;
1222};
1223
1224void one_flush_endio(struct bio *bio)
1225{
1226        struct one_flush_context *octx = bio->bi_private;
1227        struct drbd_device *device = octx->device;
1228        struct issue_flush_context *ctx = octx->ctx;
1229
1230        if (bio->bi_error) {
1231                ctx->error = bio->bi_error;
1232                drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1233        }
1234        kfree(octx);
1235        bio_put(bio);
1236
1237        clear_bit(FLUSH_PENDING, &device->flags);
1238        put_ldev(device);
1239        kref_put(&device->kref, drbd_destroy_device);
1240
1241        if (atomic_dec_and_test(&ctx->pending))
1242                complete(&ctx->done);
1243}
1244
1245static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1246{
1247        struct bio *bio = bio_alloc(GFP_NOIO, 0);
1248        struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1249        if (!bio || !octx) {
1250                drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1251                /* FIXME: what else can I do now?  disconnecting or detaching
1252                 * really does not help to improve the state of the world, either.
1253                 */
1254                kfree(octx);
1255                if (bio)
1256                        bio_put(bio);
1257
1258                ctx->error = -ENOMEM;
1259                put_ldev(device);
1260                kref_put(&device->kref, drbd_destroy_device);
1261                return;
1262        }
1263
1264        octx->device = device;
1265        octx->ctx = ctx;
1266        bio->bi_bdev = device->ldev->backing_bdev;
1267        bio->bi_private = octx;
1268        bio->bi_end_io = one_flush_endio;
1269        bio_set_op_attrs(bio, REQ_OP_FLUSH, WRITE_FLUSH);
1270
1271        device->flush_jif = jiffies;
1272        set_bit(FLUSH_PENDING, &device->flags);
1273        atomic_inc(&ctx->pending);
1274        submit_bio(bio);
1275}
1276
1277static void drbd_flush(struct drbd_connection *connection)
1278{
1279        if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1280                struct drbd_peer_device *peer_device;
1281                struct issue_flush_context ctx;
1282                int vnr;
1283
1284                atomic_set(&ctx.pending, 1);
1285                ctx.error = 0;
1286                init_completion(&ctx.done);
1287
1288                rcu_read_lock();
1289                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1290                        struct drbd_device *device = peer_device->device;
1291
1292                        if (!get_ldev(device))
1293                                continue;
1294                        kref_get(&device->kref);
1295                        rcu_read_unlock();
1296
1297                        submit_one_flush(device, &ctx);
1298
1299                        rcu_read_lock();
1300                }
1301                rcu_read_unlock();
1302
1303                /* Do we want to add a timeout,
1304                 * if disk-timeout is set? */
1305                if (!atomic_dec_and_test(&ctx.pending))
1306                        wait_for_completion(&ctx.done);
1307
1308                if (ctx.error) {
1309                        /* would rather check on EOPNOTSUPP, but that is not reliable.
1310                         * don't try again for ANY return value != 0
1311                         * if (rv == -EOPNOTSUPP) */
1312                        /* Any error is already reported by bio_endio callback. */
1313                        drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1314                }
1315        }
1316}
1317
1318/**
1319 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1320 * @device:     DRBD device.
1321 * @epoch:      Epoch object.
1322 * @ev:         Epoch event.
1323 */
1324static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1325                                               struct drbd_epoch *epoch,
1326                                               enum epoch_event ev)
1327{
1328        int epoch_size;
1329        struct drbd_epoch *next_epoch;
1330        enum finish_epoch rv = FE_STILL_LIVE;
1331
1332        spin_lock(&connection->epoch_lock);
1333        do {
1334                next_epoch = NULL;
1335
1336                epoch_size = atomic_read(&epoch->epoch_size);
1337
1338                switch (ev & ~EV_CLEANUP) {
1339                case EV_PUT:
1340                        atomic_dec(&epoch->active);
1341                        break;
1342                case EV_GOT_BARRIER_NR:
1343                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1344                        break;
1345                case EV_BECAME_LAST:
1346                        /* nothing to do*/
1347                        break;
1348                }
1349
1350                if (epoch_size != 0 &&
1351                    atomic_read(&epoch->active) == 0 &&
1352                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1353                        if (!(ev & EV_CLEANUP)) {
1354                                spin_unlock(&connection->epoch_lock);
1355                                drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1356                                spin_lock(&connection->epoch_lock);
1357                        }
1358#if 0
1359                        /* FIXME: dec unacked on connection, once we have
1360                         * something to count pending connection packets in. */
1361                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1362                                dec_unacked(epoch->connection);
1363#endif
1364
1365                        if (connection->current_epoch != epoch) {
1366                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1367                                list_del(&epoch->list);
1368                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1369                                connection->epochs--;
1370                                kfree(epoch);
1371
1372                                if (rv == FE_STILL_LIVE)
1373                                        rv = FE_DESTROYED;
1374                        } else {
1375                                epoch->flags = 0;
1376                                atomic_set(&epoch->epoch_size, 0);
1377                                /* atomic_set(&epoch->active, 0); is already zero */
1378                                if (rv == FE_STILL_LIVE)
1379                                        rv = FE_RECYCLED;
1380                        }
1381                }
1382
1383                if (!next_epoch)
1384                        break;
1385
1386                epoch = next_epoch;
1387        } while (1);
1388
1389        spin_unlock(&connection->epoch_lock);
1390
1391        return rv;
1392}
1393
1394static enum write_ordering_e
1395max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1396{
1397        struct disk_conf *dc;
1398
1399        dc = rcu_dereference(bdev->disk_conf);
1400
1401        if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1402                wo = WO_DRAIN_IO;
1403        if (wo == WO_DRAIN_IO && !dc->disk_drain)
1404                wo = WO_NONE;
1405
1406        return wo;
1407}
1408
1409/**
1410 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1411 * @connection: DRBD connection.
1412 * @wo:         Write ordering method to try.
1413 */
1414void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1415                              enum write_ordering_e wo)
1416{
1417        struct drbd_device *device;
1418        enum write_ordering_e pwo;
1419        int vnr;
1420        static char *write_ordering_str[] = {
1421                [WO_NONE] = "none",
1422                [WO_DRAIN_IO] = "drain",
1423                [WO_BDEV_FLUSH] = "flush",
1424        };
1425
1426        pwo = resource->write_ordering;
1427        if (wo != WO_BDEV_FLUSH)
1428                wo = min(pwo, wo);
1429        rcu_read_lock();
1430        idr_for_each_entry(&resource->devices, device, vnr) {
1431                if (get_ldev(device)) {
1432                        wo = max_allowed_wo(device->ldev, wo);
1433                        if (device->ldev == bdev)
1434                                bdev = NULL;
1435                        put_ldev(device);
1436                }
1437        }
1438
1439        if (bdev)
1440                wo = max_allowed_wo(bdev, wo);
1441
1442        rcu_read_unlock();
1443
1444        resource->write_ordering = wo;
1445        if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1446                drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1447}
1448
1449/*
1450 * We *may* ignore the discard-zeroes-data setting, if so configured.
1451 *
1452 * Assumption is that it "discard_zeroes_data=0" is only because the backend
1453 * may ignore partial unaligned discards.
1454 *
1455 * LVM/DM thin as of at least
1456 *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1457 *   Library version: 1.02.93-RHEL7 (2015-01-28)
1458 *   Driver version:  4.29.0
1459 * still behaves this way.
1460 *
1461 * For unaligned (wrt. alignment and granularity) or too small discards,
1462 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1463 * but discard all the aligned full chunks.
1464 *
1465 * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1466 */
1467int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1468{
1469        struct block_device *bdev = device->ldev->backing_bdev;
1470        struct request_queue *q = bdev_get_queue(bdev);
1471        sector_t tmp, nr;
1472        unsigned int max_discard_sectors, granularity;
1473        int alignment;
1474        int err = 0;
1475
1476        if (!discard)
1477                goto zero_out;
1478
1479        /* Zero-sector (unknown) and one-sector granularities are the same.  */
1480        granularity = max(q->limits.discard_granularity >> 9, 1U);
1481        alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1482
1483        max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1484        max_discard_sectors -= max_discard_sectors % granularity;
1485        if (unlikely(!max_discard_sectors))
1486                goto zero_out;
1487
1488        if (nr_sectors < granularity)
1489                goto zero_out;
1490
1491        tmp = start;
1492        if (sector_div(tmp, granularity) != alignment) {
1493                if (nr_sectors < 2*granularity)
1494                        goto zero_out;
1495                /* start + gran - (start + gran - align) % gran */
1496                tmp = start + granularity - alignment;
1497                tmp = start + granularity - sector_div(tmp, granularity);
1498
1499                nr = tmp - start;
1500                err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1501                nr_sectors -= nr;
1502                start = tmp;
1503        }
1504        while (nr_sectors >= granularity) {
1505                nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1506                err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1507                nr_sectors -= nr;
1508                start += nr;
1509        }
1510 zero_out:
1511        if (nr_sectors) {
1512                err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1513        }
1514        return err != 0;
1515}
1516
1517static bool can_do_reliable_discards(struct drbd_device *device)
1518{
1519        struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1520        struct disk_conf *dc;
1521        bool can_do;
1522
1523        if (!blk_queue_discard(q))
1524                return false;
1525
1526        if (q->limits.discard_zeroes_data)
1527                return true;
1528
1529        rcu_read_lock();
1530        dc = rcu_dereference(device->ldev->disk_conf);
1531        can_do = dc->discard_zeroes_if_aligned;
1532        rcu_read_unlock();
1533        return can_do;
1534}
1535
1536static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1537{
1538        /* If the backend cannot discard, or does not guarantee
1539         * read-back zeroes in discarded ranges, we fall back to
1540         * zero-out.  Unless configuration specifically requested
1541         * otherwise. */
1542        if (!can_do_reliable_discards(device))
1543                peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1544
1545        if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1546            peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1547                peer_req->flags |= EE_WAS_ERROR;
1548        drbd_endio_write_sec_final(peer_req);
1549}
1550
1551static void drbd_issue_peer_wsame(struct drbd_device *device,
1552                                  struct drbd_peer_request *peer_req)
1553{
1554        struct block_device *bdev = device->ldev->backing_bdev;
1555        sector_t s = peer_req->i.sector;
1556        sector_t nr = peer_req->i.size >> 9;
1557        if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1558                peer_req->flags |= EE_WAS_ERROR;
1559        drbd_endio_write_sec_final(peer_req);
1560}
1561
1562
1563/**
1564 * drbd_submit_peer_request()
1565 * @device:     DRBD device.
1566 * @peer_req:   peer request
1567 * @rw:         flag field, see bio->bi_opf
1568 *
1569 * May spread the pages to multiple bios,
1570 * depending on bio_add_page restrictions.
1571 *
1572 * Returns 0 if all bios have been submitted,
1573 * -ENOMEM if we could not allocate enough bios,
1574 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1575 *  single page to an empty bio (which should never happen and likely indicates
1576 *  that the lower level IO stack is in some way broken). This has been observed
1577 *  on certain Xen deployments.
1578 */
1579/* TODO allocate from our own bio_set. */
1580int drbd_submit_peer_request(struct drbd_device *device,
1581                             struct drbd_peer_request *peer_req,
1582                             const unsigned op, const unsigned op_flags,
1583                             const int fault_type)
1584{
1585        struct bio *bios = NULL;
1586        struct bio *bio;
1587        struct page *page = peer_req->pages;
1588        sector_t sector = peer_req->i.sector;
1589        unsigned data_size = peer_req->i.size;
1590        unsigned n_bios = 0;
1591        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1592        int err = -ENOMEM;
1593
1594        /* TRIM/DISCARD: for now, always use the helper function
1595         * blkdev_issue_zeroout(..., discard=true).
1596         * It's synchronous, but it does the right thing wrt. bio splitting.
1597         * Correctness first, performance later.  Next step is to code an
1598         * asynchronous variant of the same.
1599         */
1600        if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1601                /* wait for all pending IO completions, before we start
1602                 * zeroing things out. */
1603                conn_wait_active_ee_empty(peer_req->peer_device->connection);
1604                /* add it to the active list now,
1605                 * so we can find it to present it in debugfs */
1606                peer_req->submit_jif = jiffies;
1607                peer_req->flags |= EE_SUBMITTED;
1608
1609                /* If this was a resync request from receive_rs_deallocated(),
1610                 * it is already on the sync_ee list */
1611                if (list_empty(&peer_req->w.list)) {
1612                        spin_lock_irq(&device->resource->req_lock);
1613                        list_add_tail(&peer_req->w.list, &device->active_ee);
1614                        spin_unlock_irq(&device->resource->req_lock);
1615                }
1616
1617                if (peer_req->flags & EE_IS_TRIM)
1618                        drbd_issue_peer_discard(device, peer_req);
1619                else /* EE_WRITE_SAME */
1620                        drbd_issue_peer_wsame(device, peer_req);
1621                return 0;
1622        }
1623
1624        /* In most cases, we will only need one bio.  But in case the lower
1625         * level restrictions happen to be different at this offset on this
1626         * side than those of the sending peer, we may need to submit the
1627         * request in more than one bio.
1628         *
1629         * Plain bio_alloc is good enough here, this is no DRBD internally
1630         * generated bio, but a bio allocated on behalf of the peer.
1631         */
1632next_bio:
1633        bio = bio_alloc(GFP_NOIO, nr_pages);
1634        if (!bio) {
1635                drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1636                goto fail;
1637        }
1638        /* > peer_req->i.sector, unless this is the first bio */
1639        bio->bi_iter.bi_sector = sector;
1640        bio->bi_bdev = device->ldev->backing_bdev;
1641        bio_set_op_attrs(bio, op, op_flags);
1642        bio->bi_private = peer_req;
1643        bio->bi_end_io = drbd_peer_request_endio;
1644
1645        bio->bi_next = bios;
1646        bios = bio;
1647        ++n_bios;
1648
1649        page_chain_for_each(page) {
1650                unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1651                if (!bio_add_page(bio, page, len, 0)) {
1652                        /* A single page must always be possible!
1653                         * But in case it fails anyways,
1654                         * we deal with it, and complain (below). */
1655                        if (bio->bi_vcnt == 0) {
1656                                drbd_err(device,
1657                                        "bio_add_page failed for len=%u, "
1658                                        "bi_vcnt=0 (bi_sector=%llu)\n",
1659                                        len, (uint64_t)bio->bi_iter.bi_sector);
1660                                err = -ENOSPC;
1661                                goto fail;
1662                        }
1663                        goto next_bio;
1664                }
1665                data_size -= len;
1666                sector += len >> 9;
1667                --nr_pages;
1668        }
1669        D_ASSERT(device, data_size == 0);
1670        D_ASSERT(device, page == NULL);
1671
1672        atomic_set(&peer_req->pending_bios, n_bios);
1673        /* for debugfs: update timestamp, mark as submitted */
1674        peer_req->submit_jif = jiffies;
1675        peer_req->flags |= EE_SUBMITTED;
1676        do {
1677                bio = bios;
1678                bios = bios->bi_next;
1679                bio->bi_next = NULL;
1680
1681                drbd_generic_make_request(device, fault_type, bio);
1682        } while (bios);
1683        return 0;
1684
1685fail:
1686        while (bios) {
1687                bio = bios;
1688                bios = bios->bi_next;
1689                bio_put(bio);
1690        }
1691        return err;
1692}
1693
1694static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1695                                             struct drbd_peer_request *peer_req)
1696{
1697        struct drbd_interval *i = &peer_req->i;
1698
1699        drbd_remove_interval(&device->write_requests, i);
1700        drbd_clear_interval(i);
1701
1702        /* Wake up any processes waiting for this peer request to complete.  */
1703        if (i->waiting)
1704                wake_up(&device->misc_wait);
1705}
1706
1707static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1708{
1709        struct drbd_peer_device *peer_device;
1710        int vnr;
1711
1712        rcu_read_lock();
1713        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1714                struct drbd_device *device = peer_device->device;
1715
1716                kref_get(&device->kref);
1717                rcu_read_unlock();
1718                drbd_wait_ee_list_empty(device, &device->active_ee);
1719                kref_put(&device->kref, drbd_destroy_device);
1720                rcu_read_lock();
1721        }
1722        rcu_read_unlock();
1723}
1724
1725static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1726{
1727        int rv;
1728        struct p_barrier *p = pi->data;
1729        struct drbd_epoch *epoch;
1730
1731        /* FIXME these are unacked on connection,
1732         * not a specific (peer)device.
1733         */
1734        connection->current_epoch->barrier_nr = p->barrier;
1735        connection->current_epoch->connection = connection;
1736        rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1737
1738        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1739         * the activity log, which means it would not be resynced in case the
1740         * R_PRIMARY crashes now.
1741         * Therefore we must send the barrier_ack after the barrier request was
1742         * completed. */
1743        switch (connection->resource->write_ordering) {
1744        case WO_NONE:
1745                if (rv == FE_RECYCLED)
1746                        return 0;
1747
1748                /* receiver context, in the writeout path of the other node.
1749                 * avoid potential distributed deadlock */
1750                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1751                if (epoch)
1752                        break;
1753                else
1754                        drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1755                        /* Fall through */
1756
1757        case WO_BDEV_FLUSH:
1758        case WO_DRAIN_IO:
1759                conn_wait_active_ee_empty(connection);
1760                drbd_flush(connection);
1761
1762                if (atomic_read(&connection->current_epoch->epoch_size)) {
1763                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1764                        if (epoch)
1765                                break;
1766                }
1767
1768                return 0;
1769        default:
1770                drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1771                         connection->resource->write_ordering);
1772                return -EIO;
1773        }
1774
1775        epoch->flags = 0;
1776        atomic_set(&epoch->epoch_size, 0);
1777        atomic_set(&epoch->active, 0);
1778
1779        spin_lock(&connection->epoch_lock);
1780        if (atomic_read(&connection->current_epoch->epoch_size)) {
1781                list_add(&epoch->list, &connection->current_epoch->list);
1782                connection->current_epoch = epoch;
1783                connection->epochs++;
1784        } else {
1785                /* The current_epoch got recycled while we allocated this one... */
1786                kfree(epoch);
1787        }
1788        spin_unlock(&connection->epoch_lock);
1789
1790        return 0;
1791}
1792
1793/* quick wrapper in case payload size != request_size (write same) */
1794static void drbd_csum_ee_size(struct crypto_ahash *h,
1795                              struct drbd_peer_request *r, void *d,
1796                              unsigned int payload_size)
1797{
1798        unsigned int tmp = r->i.size;
1799        r->i.size = payload_size;
1800        drbd_csum_ee(h, r, d);
1801        r->i.size = tmp;
1802}
1803
1804/* used from receive_RSDataReply (recv_resync_read)
1805 * and from receive_Data.
1806 * data_size: actual payload ("data in")
1807 *      for normal writes that is bi_size.
1808 *      for discards, that is zero.
1809 *      for write same, it is logical_block_size.
1810 * both trim and write same have the bi_size ("data len to be affected")
1811 * as extra argument in the packet header.
1812 */
1813static struct drbd_peer_request *
1814read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1815              struct packet_info *pi) __must_hold(local)
1816{
1817        struct drbd_device *device = peer_device->device;
1818        const sector_t capacity = drbd_get_capacity(device->this_bdev);
1819        struct drbd_peer_request *peer_req;
1820        struct page *page;
1821        int digest_size, err;
1822        unsigned int data_size = pi->size, ds;
1823        void *dig_in = peer_device->connection->int_dig_in;
1824        void *dig_vv = peer_device->connection->int_dig_vv;
1825        unsigned long *data;
1826        struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1827        struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1828
1829        digest_size = 0;
1830        if (!trim && peer_device->connection->peer_integrity_tfm) {
1831                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1832                /*
1833                 * FIXME: Receive the incoming digest into the receive buffer
1834                 *        here, together with its struct p_data?
1835                 */
1836                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1837                if (err)
1838                        return NULL;
1839                data_size -= digest_size;
1840        }
1841
1842        /* assume request_size == data_size, but special case trim and wsame. */
1843        ds = data_size;
1844        if (trim) {
1845                if (!expect(data_size == 0))
1846                        return NULL;
1847                ds = be32_to_cpu(trim->size);
1848        } else if (wsame) {
1849                if (data_size != queue_logical_block_size(device->rq_queue)) {
1850                        drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1851                                data_size, queue_logical_block_size(device->rq_queue));
1852                        return NULL;
1853                }
1854                if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1855                        drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1856                                data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1857                        return NULL;
1858                }
1859                ds = be32_to_cpu(wsame->size);
1860        }
1861
1862        if (!expect(IS_ALIGNED(ds, 512)))
1863                return NULL;
1864        if (trim || wsame) {
1865                if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1866                        return NULL;
1867        } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1868                return NULL;
1869
1870        /* even though we trust out peer,
1871         * we sometimes have to double check. */
1872        if (sector + (ds>>9) > capacity) {
1873                drbd_err(device, "request from peer beyond end of local disk: "
1874                        "capacity: %llus < sector: %llus + size: %u\n",
1875                        (unsigned long long)capacity,
1876                        (unsigned long long)sector, ds);
1877                return NULL;
1878        }
1879
1880        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1881         * "criss-cross" setup, that might cause write-out on some other DRBD,
1882         * which in turn might block on the other node at this very place.  */
1883        peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1884        if (!peer_req)
1885                return NULL;
1886
1887        peer_req->flags |= EE_WRITE;
1888        if (trim) {
1889                peer_req->flags |= EE_IS_TRIM;
1890                return peer_req;
1891        }
1892        if (wsame)
1893                peer_req->flags |= EE_WRITE_SAME;
1894
1895        /* receive payload size bytes into page chain */
1896        ds = data_size;
1897        page = peer_req->pages;
1898        page_chain_for_each(page) {
1899                unsigned len = min_t(int, ds, PAGE_SIZE);
1900                data = kmap(page);
1901                err = drbd_recv_all_warn(peer_device->connection, data, len);
1902                if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1903                        drbd_err(device, "Fault injection: Corrupting data on receive\n");
1904                        data[0] = data[0] ^ (unsigned long)-1;
1905                }
1906                kunmap(page);
1907                if (err) {
1908                        drbd_free_peer_req(device, peer_req);
1909                        return NULL;
1910                }
1911                ds -= len;
1912        }
1913
1914        if (digest_size) {
1915                drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1916                if (memcmp(dig_in, dig_vv, digest_size)) {
1917                        drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1918                                (unsigned long long)sector, data_size);
1919                        drbd_free_peer_req(device, peer_req);
1920                        return NULL;
1921                }
1922        }
1923        device->recv_cnt += data_size >> 9;
1924        return peer_req;
1925}
1926
1927/* drbd_drain_block() just takes a data block
1928 * out of the socket input buffer, and discards it.
1929 */
1930static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1931{
1932        struct page *page;
1933        int err = 0;
1934        void *data;
1935
1936        if (!data_size)
1937                return 0;
1938
1939        page = drbd_alloc_pages(peer_device, 1, 1);
1940
1941        data = kmap(page);
1942        while (data_size) {
1943                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1944
1945                err = drbd_recv_all_warn(peer_device->connection, data, len);
1946                if (err)
1947                        break;
1948                data_size -= len;
1949        }
1950        kunmap(page);
1951        drbd_free_pages(peer_device->device, page, 0);
1952        return err;
1953}
1954
1955static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1956                           sector_t sector, int data_size)
1957{
1958        struct bio_vec bvec;
1959        struct bvec_iter iter;
1960        struct bio *bio;
1961        int digest_size, err, expect;
1962        void *dig_in = peer_device->connection->int_dig_in;
1963        void *dig_vv = peer_device->connection->int_dig_vv;
1964
1965        digest_size = 0;
1966        if (peer_device->connection->peer_integrity_tfm) {
1967                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1968                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1969                if (err)
1970                        return err;
1971                data_size -= digest_size;
1972        }
1973
1974        /* optimistically update recv_cnt.  if receiving fails below,
1975         * we disconnect anyways, and counters will be reset. */
1976        peer_device->device->recv_cnt += data_size>>9;
1977
1978        bio = req->master_bio;
1979        D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1980
1981        bio_for_each_segment(bvec, bio, iter) {
1982                void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1983                expect = min_t(int, data_size, bvec.bv_len);
1984                err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1985                kunmap(bvec.bv_page);
1986                if (err)
1987                        return err;
1988                data_size -= expect;
1989        }
1990
1991        if (digest_size) {
1992                drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1993                if (memcmp(dig_in, dig_vv, digest_size)) {
1994                        drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1995                        return -EINVAL;
1996                }
1997        }
1998
1999        D_ASSERT(peer_device->device, data_size == 0);
2000        return 0;
2001}
2002
2003/*
2004 * e_end_resync_block() is called in ack_sender context via
2005 * drbd_finish_peer_reqs().
2006 */
2007static int e_end_resync_block(struct drbd_work *w, int unused)
2008{
2009        struct drbd_peer_request *peer_req =
2010                container_of(w, struct drbd_peer_request, w);
2011        struct drbd_peer_device *peer_device = peer_req->peer_device;
2012        struct drbd_device *device = peer_device->device;
2013        sector_t sector = peer_req->i.sector;
2014        int err;
2015
2016        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2017
2018        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2019                drbd_set_in_sync(device, sector, peer_req->i.size);
2020                err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2021        } else {
2022                /* Record failure to sync */
2023                drbd_rs_failed_io(device, sector, peer_req->i.size);
2024
2025                err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2026        }
2027        dec_unacked(device);
2028
2029        return err;
2030}
2031
2032static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2033                            struct packet_info *pi) __releases(local)
2034{
2035        struct drbd_device *device = peer_device->device;
2036        struct drbd_peer_request *peer_req;
2037
2038        peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2039        if (!peer_req)
2040                goto fail;
2041
2042        dec_rs_pending(device);
2043
2044        inc_unacked(device);
2045        /* corresponding dec_unacked() in e_end_resync_block()
2046         * respective _drbd_clear_done_ee */
2047
2048        peer_req->w.cb = e_end_resync_block;
2049        peer_req->submit_jif = jiffies;
2050
2051        spin_lock_irq(&device->resource->req_lock);
2052        list_add_tail(&peer_req->w.list, &device->sync_ee);
2053        spin_unlock_irq(&device->resource->req_lock);
2054
2055        atomic_add(pi->size >> 9, &device->rs_sect_ev);
2056        if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2057                                     DRBD_FAULT_RS_WR) == 0)
2058                return 0;
2059
2060        /* don't care for the reason here */
2061        drbd_err(device, "submit failed, triggering re-connect\n");
2062        spin_lock_irq(&device->resource->req_lock);
2063        list_del(&peer_req->w.list);
2064        spin_unlock_irq(&device->resource->req_lock);
2065
2066        drbd_free_peer_req(device, peer_req);
2067fail:
2068        put_ldev(device);
2069        return -EIO;
2070}
2071
2072static struct drbd_request *
2073find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2074             sector_t sector, bool missing_ok, const char *func)
2075{
2076        struct drbd_request *req;
2077
2078        /* Request object according to our peer */
2079        req = (struct drbd_request *)(unsigned long)id;
2080        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2081                return req;
2082        if (!missing_ok) {
2083                drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2084                        (unsigned long)id, (unsigned long long)sector);
2085        }
2086        return NULL;
2087}
2088
2089static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2090{
2091        struct drbd_peer_device *peer_device;
2092        struct drbd_device *device;
2093        struct drbd_request *req;
2094        sector_t sector;
2095        int err;
2096        struct p_data *p = pi->data;
2097
2098        peer_device = conn_peer_device(connection, pi->vnr);
2099        if (!peer_device)
2100                return -EIO;
2101        device = peer_device->device;
2102
2103        sector = be64_to_cpu(p->sector);
2104
2105        spin_lock_irq(&device->resource->req_lock);
2106        req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2107        spin_unlock_irq(&device->resource->req_lock);
2108        if (unlikely(!req))
2109                return -EIO;
2110
2111        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2112         * special casing it there for the various failure cases.
2113         * still no race with drbd_fail_pending_reads */
2114        err = recv_dless_read(peer_device, req, sector, pi->size);
2115        if (!err)
2116                req_mod(req, DATA_RECEIVED);
2117        /* else: nothing. handled from drbd_disconnect...
2118         * I don't think we may complete this just yet
2119         * in case we are "on-disconnect: freeze" */
2120
2121        return err;
2122}
2123
2124static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2125{
2126        struct drbd_peer_device *peer_device;
2127        struct drbd_device *device;
2128        sector_t sector;
2129        int err;
2130        struct p_data *p = pi->data;
2131
2132        peer_device = conn_peer_device(connection, pi->vnr);
2133        if (!peer_device)
2134                return -EIO;
2135        device = peer_device->device;
2136
2137        sector = be64_to_cpu(p->sector);
2138        D_ASSERT(device, p->block_id == ID_SYNCER);
2139
2140        if (get_ldev(device)) {
2141                /* data is submitted to disk within recv_resync_read.
2142                 * corresponding put_ldev done below on error,
2143                 * or in drbd_peer_request_endio. */
2144                err = recv_resync_read(peer_device, sector, pi);
2145        } else {
2146                if (__ratelimit(&drbd_ratelimit_state))
2147                        drbd_err(device, "Can not write resync data to local disk.\n");
2148
2149                err = drbd_drain_block(peer_device, pi->size);
2150
2151                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2152        }
2153
2154        atomic_add(pi->size >> 9, &device->rs_sect_in);
2155
2156        return err;
2157}
2158
2159static void restart_conflicting_writes(struct drbd_device *device,
2160                                       sector_t sector, int size)
2161{
2162        struct drbd_interval *i;
2163        struct drbd_request *req;
2164
2165        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2166                if (!i->local)
2167                        continue;
2168                req = container_of(i, struct drbd_request, i);
2169                if (req->rq_state & RQ_LOCAL_PENDING ||
2170                    !(req->rq_state & RQ_POSTPONED))
2171                        continue;
2172                /* as it is RQ_POSTPONED, this will cause it to
2173                 * be queued on the retry workqueue. */
2174                __req_mod(req, CONFLICT_RESOLVED, NULL);
2175        }
2176}
2177
2178/*
2179 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2180 */
2181static int e_end_block(struct drbd_work *w, int cancel)
2182{
2183        struct drbd_peer_request *peer_req =
2184                container_of(w, struct drbd_peer_request, w);
2185        struct drbd_peer_device *peer_device = peer_req->peer_device;
2186        struct drbd_device *device = peer_device->device;
2187        sector_t sector = peer_req->i.sector;
2188        int err = 0, pcmd;
2189
2190        if (peer_req->flags & EE_SEND_WRITE_ACK) {
2191                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2192                        pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2193                                device->state.conn <= C_PAUSED_SYNC_T &&
2194                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2195                                P_RS_WRITE_ACK : P_WRITE_ACK;
2196                        err = drbd_send_ack(peer_device, pcmd, peer_req);
2197                        if (pcmd == P_RS_WRITE_ACK)
2198                                drbd_set_in_sync(device, sector, peer_req->i.size);
2199                } else {
2200                        err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2201                        /* we expect it to be marked out of sync anyways...
2202                         * maybe assert this?  */
2203                }
2204                dec_unacked(device);
2205        }
2206
2207        /* we delete from the conflict detection hash _after_ we sent out the
2208         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2209        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2210                spin_lock_irq(&device->resource->req_lock);
2211                D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2212                drbd_remove_epoch_entry_interval(device, peer_req);
2213                if (peer_req->flags & EE_RESTART_REQUESTS)
2214                        restart_conflicting_writes(device, sector, peer_req->i.size);
2215                spin_unlock_irq(&device->resource->req_lock);
2216        } else
2217                D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2218
2219        drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2220
2221        return err;
2222}
2223
2224static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2225{
2226        struct drbd_peer_request *peer_req =
2227                container_of(w, struct drbd_peer_request, w);
2228        struct drbd_peer_device *peer_device = peer_req->peer_device;
2229        int err;
2230
2231        err = drbd_send_ack(peer_device, ack, peer_req);
2232        dec_unacked(peer_device->device);
2233
2234        return err;
2235}
2236
2237static int e_send_superseded(struct drbd_work *w, int unused)
2238{
2239        return e_send_ack(w, P_SUPERSEDED);
2240}
2241
2242static int e_send_retry_write(struct drbd_work *w, int unused)
2243{
2244        struct drbd_peer_request *peer_req =
2245                container_of(w, struct drbd_peer_request, w);
2246        struct drbd_connection *connection = peer_req->peer_device->connection;
2247
2248        return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2249                             P_RETRY_WRITE : P_SUPERSEDED);
2250}
2251
2252static bool seq_greater(u32 a, u32 b)
2253{
2254        /*
2255         * We assume 32-bit wrap-around here.
2256         * For 24-bit wrap-around, we would have to shift:
2257         *  a <<= 8; b <<= 8;
2258         */
2259        return (s32)a - (s32)b > 0;
2260}
2261
2262static u32 seq_max(u32 a, u32 b)
2263{
2264        return seq_greater(a, b) ? a : b;
2265}
2266
2267static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2268{
2269        struct drbd_device *device = peer_device->device;
2270        unsigned int newest_peer_seq;
2271
2272        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2273                spin_lock(&device->peer_seq_lock);
2274                newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2275                device->peer_seq = newest_peer_seq;
2276                spin_unlock(&device->peer_seq_lock);
2277                /* wake up only if we actually changed device->peer_seq */
2278                if (peer_seq == newest_peer_seq)
2279                        wake_up(&device->seq_wait);
2280        }
2281}
2282
2283static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2284{
2285        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2286}
2287
2288/* maybe change sync_ee into interval trees as well? */
2289static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2290{
2291        struct drbd_peer_request *rs_req;
2292        bool rv = false;
2293
2294        spin_lock_irq(&device->resource->req_lock);
2295        list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2296                if (overlaps(peer_req->i.sector, peer_req->i.size,
2297                             rs_req->i.sector, rs_req->i.size)) {
2298                        rv = true;
2299                        break;
2300                }
2301        }
2302        spin_unlock_irq(&device->resource->req_lock);
2303
2304        return rv;
2305}
2306
2307/* Called from receive_Data.
2308 * Synchronize packets on sock with packets on msock.
2309 *
2310 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2311 * packet traveling on msock, they are still processed in the order they have
2312 * been sent.
2313 *
2314 * Note: we don't care for Ack packets overtaking P_DATA packets.
2315 *
2316 * In case packet_seq is larger than device->peer_seq number, there are
2317 * outstanding packets on the msock. We wait for them to arrive.
2318 * In case we are the logically next packet, we update device->peer_seq
2319 * ourselves. Correctly handles 32bit wrap around.
2320 *
2321 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2322 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2323 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2324 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2325 *
2326 * returns 0 if we may process the packet,
2327 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2328static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2329{
2330        struct drbd_device *device = peer_device->device;
2331        DEFINE_WAIT(wait);
2332        long timeout;
2333        int ret = 0, tp;
2334
2335        if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2336                return 0;
2337
2338        spin_lock(&device->peer_seq_lock);
2339        for (;;) {
2340                if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2341                        device->peer_seq = seq_max(device->peer_seq, peer_seq);
2342                        break;
2343                }
2344
2345                if (signal_pending(current)) {
2346                        ret = -ERESTARTSYS;
2347                        break;
2348                }
2349
2350                rcu_read_lock();
2351                tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2352                rcu_read_unlock();
2353
2354                if (!tp)
2355                        break;
2356
2357                /* Only need to wait if two_primaries is enabled */
2358                prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2359                spin_unlock(&device->peer_seq_lock);
2360                rcu_read_lock();
2361                timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2362                rcu_read_unlock();
2363                timeout = schedule_timeout(timeout);
2364                spin_lock(&device->peer_seq_lock);
2365                if (!timeout) {
2366                        ret = -ETIMEDOUT;
2367                        drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2368                        break;
2369                }
2370        }
2371        spin_unlock(&device->peer_seq_lock);
2372        finish_wait(&device->seq_wait, &wait);
2373        return ret;
2374}
2375
2376/* see also bio_flags_to_wire()
2377 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2378 * flags and back. We may replicate to other kernel versions. */
2379static unsigned long wire_flags_to_bio_flags(u32 dpf)
2380{
2381        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2382                (dpf & DP_FUA ? REQ_FUA : 0) |
2383                (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2384}
2385
2386static unsigned long wire_flags_to_bio_op(u32 dpf)
2387{
2388        if (dpf & DP_DISCARD)
2389                return REQ_OP_DISCARD;
2390        else
2391                return REQ_OP_WRITE;
2392}
2393
2394static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2395                                    unsigned int size)
2396{
2397        struct drbd_interval *i;
2398
2399    repeat:
2400        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2401                struct drbd_request *req;
2402                struct bio_and_error m;
2403
2404                if (!i->local)
2405                        continue;
2406                req = container_of(i, struct drbd_request, i);
2407                if (!(req->rq_state & RQ_POSTPONED))
2408                        continue;
2409                req->rq_state &= ~RQ_POSTPONED;
2410                __req_mod(req, NEG_ACKED, &m);
2411                spin_unlock_irq(&device->resource->req_lock);
2412                if (m.bio)
2413                        complete_master_bio(device, &m);
2414                spin_lock_irq(&device->resource->req_lock);
2415                goto repeat;
2416        }
2417}
2418
2419static int handle_write_conflicts(struct drbd_device *device,
2420                                  struct drbd_peer_request *peer_req)
2421{
2422        struct drbd_connection *connection = peer_req->peer_device->connection;
2423        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2424        sector_t sector = peer_req->i.sector;
2425        const unsigned int size = peer_req->i.size;
2426        struct drbd_interval *i;
2427        bool equal;
2428        int err;
2429
2430        /*
2431         * Inserting the peer request into the write_requests tree will prevent
2432         * new conflicting local requests from being added.
2433         */
2434        drbd_insert_interval(&device->write_requests, &peer_req->i);
2435
2436    repeat:
2437        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2438                if (i == &peer_req->i)
2439                        continue;
2440                if (i->completed)
2441                        continue;
2442
2443                if (!i->local) {
2444                        /*
2445                         * Our peer has sent a conflicting remote request; this
2446                         * should not happen in a two-node setup.  Wait for the
2447                         * earlier peer request to complete.
2448                         */
2449                        err = drbd_wait_misc(device, i);
2450                        if (err)
2451                                goto out;
2452                        goto repeat;
2453                }
2454
2455                equal = i->sector == sector && i->size == size;
2456                if (resolve_conflicts) {
2457                        /*
2458                         * If the peer request is fully contained within the
2459                         * overlapping request, it can be considered overwritten
2460                         * and thus superseded; otherwise, it will be retried
2461                         * once all overlapping requests have completed.
2462                         */
2463                        bool superseded = i->sector <= sector && i->sector +
2464                                       (i->size >> 9) >= sector + (size >> 9);
2465
2466                        if (!equal)
2467                                drbd_alert(device, "Concurrent writes detected: "
2468                                               "local=%llus +%u, remote=%llus +%u, "
2469                                               "assuming %s came first\n",
2470                                          (unsigned long long)i->sector, i->size,
2471                                          (unsigned long long)sector, size,
2472                                          superseded ? "local" : "remote");
2473
2474                        peer_req->w.cb = superseded ? e_send_superseded :
2475                                                   e_send_retry_write;
2476                        list_add_tail(&peer_req->w.list, &device->done_ee);
2477                        queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2478
2479                        err = -ENOENT;
2480                        goto out;
2481                } else {
2482                        struct drbd_request *req =
2483                                container_of(i, struct drbd_request, i);
2484
2485                        if (!equal)
2486                                drbd_alert(device, "Concurrent writes detected: "
2487                                               "local=%llus +%u, remote=%llus +%u\n",
2488                                          (unsigned long long)i->sector, i->size,
2489                                          (unsigned long long)sector, size);
2490
2491                        if (req->rq_state & RQ_LOCAL_PENDING ||
2492                            !(req->rq_state & RQ_POSTPONED)) {
2493                                /*
2494                                 * Wait for the node with the discard flag to
2495                                 * decide if this request has been superseded
2496                                 * or needs to be retried.
2497                                 * Requests that have been superseded will
2498                                 * disappear from the write_requests tree.
2499                                 *
2500                                 * In addition, wait for the conflicting
2501                                 * request to finish locally before submitting
2502                                 * the conflicting peer request.
2503                                 */
2504                                err = drbd_wait_misc(device, &req->i);
2505                                if (err) {
2506                                        _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2507                                        fail_postponed_requests(device, sector, size);
2508                                        goto out;
2509                                }
2510                                goto repeat;
2511                        }
2512                        /*
2513                         * Remember to restart the conflicting requests after
2514                         * the new peer request has completed.
2515                         */
2516                        peer_req->flags |= EE_RESTART_REQUESTS;
2517                }
2518        }
2519        err = 0;
2520
2521    out:
2522        if (err)
2523                drbd_remove_epoch_entry_interval(device, peer_req);
2524        return err;
2525}
2526
2527/* mirrored write */
2528static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2529{
2530        struct drbd_peer_device *peer_device;
2531        struct drbd_device *device;
2532        struct net_conf *nc;
2533        sector_t sector;
2534        struct drbd_peer_request *peer_req;
2535        struct p_data *p = pi->data;
2536        u32 peer_seq = be32_to_cpu(p->seq_num);
2537        int op, op_flags;
2538        u32 dp_flags;
2539        int err, tp;
2540
2541        peer_device = conn_peer_device(connection, pi->vnr);
2542        if (!peer_device)
2543                return -EIO;
2544        device = peer_device->device;
2545
2546        if (!get_ldev(device)) {
2547                int err2;
2548
2549                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2550                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2551                atomic_inc(&connection->current_epoch->epoch_size);
2552                err2 = drbd_drain_block(peer_device, pi->size);
2553                if (!err)
2554                        err = err2;
2555                return err;
2556        }
2557
2558        /*
2559         * Corresponding put_ldev done either below (on various errors), or in
2560         * drbd_peer_request_endio, if we successfully submit the data at the
2561         * end of this function.
2562         */
2563
2564        sector = be64_to_cpu(p->sector);
2565        peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2566        if (!peer_req) {
2567                put_ldev(device);
2568                return -EIO;
2569        }
2570
2571        peer_req->w.cb = e_end_block;
2572        peer_req->submit_jif = jiffies;
2573        peer_req->flags |= EE_APPLICATION;
2574
2575        dp_flags = be32_to_cpu(p->dp_flags);
2576        op = wire_flags_to_bio_op(dp_flags);
2577        op_flags = wire_flags_to_bio_flags(dp_flags);
2578        if (pi->cmd == P_TRIM) {
2579                D_ASSERT(peer_device, peer_req->i.size > 0);
2580                D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2581                D_ASSERT(peer_device, peer_req->pages == NULL);
2582        } else if (peer_req->pages == NULL) {
2583                D_ASSERT(device, peer_req->i.size == 0);
2584                D_ASSERT(device, dp_flags & DP_FLUSH);
2585        }
2586
2587        if (dp_flags & DP_MAY_SET_IN_SYNC)
2588                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2589
2590        spin_lock(&connection->epoch_lock);
2591        peer_req->epoch = connection->current_epoch;
2592        atomic_inc(&peer_req->epoch->epoch_size);
2593        atomic_inc(&peer_req->epoch->active);
2594        spin_unlock(&connection->epoch_lock);
2595
2596        rcu_read_lock();
2597        nc = rcu_dereference(peer_device->connection->net_conf);
2598        tp = nc->two_primaries;
2599        if (peer_device->connection->agreed_pro_version < 100) {
2600                switch (nc->wire_protocol) {
2601                case DRBD_PROT_C:
2602                        dp_flags |= DP_SEND_WRITE_ACK;
2603                        break;
2604                case DRBD_PROT_B:
2605                        dp_flags |= DP_SEND_RECEIVE_ACK;
2606                        break;
2607                }
2608        }
2609        rcu_read_unlock();
2610
2611        if (dp_flags & DP_SEND_WRITE_ACK) {
2612                peer_req->flags |= EE_SEND_WRITE_ACK;
2613                inc_unacked(device);
2614                /* corresponding dec_unacked() in e_end_block()
2615                 * respective _drbd_clear_done_ee */
2616        }
2617
2618        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2619                /* I really don't like it that the receiver thread
2620                 * sends on the msock, but anyways */
2621                drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2622        }
2623
2624        if (tp) {
2625                /* two primaries implies protocol C */
2626                D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2627                peer_req->flags |= EE_IN_INTERVAL_TREE;
2628                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2629                if (err)
2630                        goto out_interrupted;
2631                spin_lock_irq(&device->resource->req_lock);
2632                err = handle_write_conflicts(device, peer_req);
2633                if (err) {
2634                        spin_unlock_irq(&device->resource->req_lock);
2635                        if (err == -ENOENT) {
2636                                put_ldev(device);
2637                                return 0;
2638                        }
2639                        goto out_interrupted;
2640                }
2641        } else {
2642                update_peer_seq(peer_device, peer_seq);
2643                spin_lock_irq(&device->resource->req_lock);
2644        }
2645        /* TRIM and WRITE_SAME are processed synchronously,
2646         * we wait for all pending requests, respectively wait for
2647         * active_ee to become empty in drbd_submit_peer_request();
2648         * better not add ourselves here. */
2649        if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2650                list_add_tail(&peer_req->w.list, &device->active_ee);
2651        spin_unlock_irq(&device->resource->req_lock);
2652
2653        if (device->state.conn == C_SYNC_TARGET)
2654                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2655
2656        if (device->state.pdsk < D_INCONSISTENT) {
2657                /* In case we have the only disk of the cluster, */
2658                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2659                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2660                drbd_al_begin_io(device, &peer_req->i);
2661                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2662        }
2663
2664        err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2665                                       DRBD_FAULT_DT_WR);
2666        if (!err)
2667                return 0;
2668
2669        /* don't care for the reason here */
2670        drbd_err(device, "submit failed, triggering re-connect\n");
2671        spin_lock_irq(&device->resource->req_lock);
2672        list_del(&peer_req->w.list);
2673        drbd_remove_epoch_entry_interval(device, peer_req);
2674        spin_unlock_irq(&device->resource->req_lock);
2675        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2676                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2677                drbd_al_complete_io(device, &peer_req->i);
2678        }
2679
2680out_interrupted:
2681        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2682        put_ldev(device);
2683        drbd_free_peer_req(device, peer_req);
2684        return err;
2685}
2686
2687/* We may throttle resync, if the lower device seems to be busy,
2688 * and current sync rate is above c_min_rate.
2689 *
2690 * To decide whether or not the lower device is busy, we use a scheme similar
2691 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2692 * (more than 64 sectors) of activity we cannot account for with our own resync
2693 * activity, it obviously is "busy".
2694 *
2695 * The current sync rate used here uses only the most recent two step marks,
2696 * to have a short time average so we can react faster.
2697 */
2698bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2699                bool throttle_if_app_is_waiting)
2700{
2701        struct lc_element *tmp;
2702        bool throttle = drbd_rs_c_min_rate_throttle(device);
2703
2704        if (!throttle || throttle_if_app_is_waiting)
2705                return throttle;
2706
2707        spin_lock_irq(&device->al_lock);
2708        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2709        if (tmp) {
2710                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2711                if (test_bit(BME_PRIORITY, &bm_ext->flags))
2712                        throttle = false;
2713                /* Do not slow down if app IO is already waiting for this extent,
2714                 * and our progress is necessary for application IO to complete. */
2715        }
2716        spin_unlock_irq(&device->al_lock);
2717
2718        return throttle;
2719}
2720
2721bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2722{
2723        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2724        unsigned long db, dt, dbdt;
2725        unsigned int c_min_rate;
2726        int curr_events;
2727
2728        rcu_read_lock();
2729        c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2730        rcu_read_unlock();
2731
2732        /* feature disabled? */
2733        if (c_min_rate == 0)
2734                return false;
2735
2736        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2737                      (int)part_stat_read(&disk->part0, sectors[1]) -
2738                        atomic_read(&device->rs_sect_ev);
2739
2740        if (atomic_read(&device->ap_actlog_cnt)
2741            || curr_events - device->rs_last_events > 64) {
2742                unsigned long rs_left;
2743                int i;
2744
2745                device->rs_last_events = curr_events;
2746
2747                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2748                 * approx. */
2749                i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2750
2751                if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2752                        rs_left = device->ov_left;
2753                else
2754                        rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2755
2756                dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2757                if (!dt)
2758                        dt++;
2759                db = device->rs_mark_left[i] - rs_left;
2760                dbdt = Bit2KB(db/dt);
2761
2762                if (dbdt > c_min_rate)
2763                        return true;
2764        }
2765        return false;
2766}
2767
2768static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2769{
2770        struct drbd_peer_device *peer_device;
2771        struct drbd_device *device;
2772        sector_t sector;
2773        sector_t capacity;
2774        struct drbd_peer_request *peer_req;
2775        struct digest_info *di = NULL;
2776        int size, verb;
2777        unsigned int fault_type;
2778        struct p_block_req *p = pi->data;
2779
2780        peer_device = conn_peer_device(connection, pi->vnr);
2781        if (!peer_device)
2782                return -EIO;
2783        device = peer_device->device;
2784        capacity = drbd_get_capacity(device->this_bdev);
2785
2786        sector = be64_to_cpu(p->sector);
2787        size   = be32_to_cpu(p->blksize);
2788
2789        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2790                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2791                                (unsigned long long)sector, size);
2792                return -EINVAL;
2793        }
2794        if (sector + (size>>9) > capacity) {
2795                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2796                                (unsigned long long)sector, size);
2797                return -EINVAL;
2798        }
2799
2800        if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2801                verb = 1;
2802                switch (pi->cmd) {
2803                case P_DATA_REQUEST:
2804                        drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2805                        break;
2806                case P_RS_THIN_REQ:
2807                case P_RS_DATA_REQUEST:
2808                case P_CSUM_RS_REQUEST:
2809                case P_OV_REQUEST:
2810                        drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2811                        break;
2812                case P_OV_REPLY:
2813                        verb = 0;
2814                        dec_rs_pending(device);
2815                        drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2816                        break;
2817                default:
2818                        BUG();
2819                }
2820                if (verb && __ratelimit(&drbd_ratelimit_state))
2821                        drbd_err(device, "Can not satisfy peer's read request, "
2822                            "no local data.\n");
2823
2824                /* drain possibly payload */
2825                return drbd_drain_block(peer_device, pi->size);
2826        }
2827
2828        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2829         * "criss-cross" setup, that might cause write-out on some other DRBD,
2830         * which in turn might block on the other node at this very place.  */
2831        peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2832                        size, GFP_NOIO);
2833        if (!peer_req) {
2834                put_ldev(device);
2835                return -ENOMEM;
2836        }
2837
2838        switch (pi->cmd) {
2839        case P_DATA_REQUEST:
2840                peer_req->w.cb = w_e_end_data_req;
2841                fault_type = DRBD_FAULT_DT_RD;
2842                /* application IO, don't drbd_rs_begin_io */
2843                peer_req->flags |= EE_APPLICATION;
2844                goto submit;
2845
2846        case P_RS_THIN_REQ:
2847                /* If at some point in the future we have a smart way to
2848                   find out if this data block is completely deallocated,
2849                   then we would do something smarter here than reading
2850                   the block... */
2851                peer_req->flags |= EE_RS_THIN_REQ;
2852        case P_RS_DATA_REQUEST:
2853                peer_req->w.cb = w_e_end_rsdata_req;
2854                fault_type = DRBD_FAULT_RS_RD;
2855                /* used in the sector offset progress display */
2856                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2857                break;
2858
2859        case P_OV_REPLY:
2860        case P_CSUM_RS_REQUEST:
2861                fault_type = DRBD_FAULT_RS_RD;
2862                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2863                if (!di)
2864                        goto out_free_e;
2865
2866                di->digest_size = pi->size;
2867                di->digest = (((char *)di)+sizeof(struct digest_info));
2868
2869                peer_req->digest = di;
2870                peer_req->flags |= EE_HAS_DIGEST;
2871
2872                if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2873                        goto out_free_e;
2874
2875                if (pi->cmd == P_CSUM_RS_REQUEST) {
2876                        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2877                        peer_req->w.cb = w_e_end_csum_rs_req;
2878                        /* used in the sector offset progress display */
2879                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2880                        /* remember to report stats in drbd_resync_finished */
2881                        device->use_csums = true;
2882                } else if (pi->cmd == P_OV_REPLY) {
2883                        /* track progress, we may need to throttle */
2884                        atomic_add(size >> 9, &device->rs_sect_in);
2885                        peer_req->w.cb = w_e_end_ov_reply;
2886                        dec_rs_pending(device);
2887                        /* drbd_rs_begin_io done when we sent this request,
2888                         * but accounting still needs to be done. */
2889                        goto submit_for_resync;
2890                }
2891                break;
2892
2893        case P_OV_REQUEST:
2894                if (device->ov_start_sector == ~(sector_t)0 &&
2895                    peer_device->connection->agreed_pro_version >= 90) {
2896                        unsigned long now = jiffies;
2897                        int i;
2898                        device->ov_start_sector = sector;
2899                        device->ov_position = sector;
2900                        device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2901                        device->rs_total = device->ov_left;
2902                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2903                                device->rs_mark_left[i] = device->ov_left;
2904                                device->rs_mark_time[i] = now;
2905                        }
2906                        drbd_info(device, "Online Verify start sector: %llu\n",
2907                                        (unsigned long long)sector);
2908                }
2909                peer_req->w.cb = w_e_end_ov_req;
2910                fault_type = DRBD_FAULT_RS_RD;
2911                break;
2912
2913        default:
2914                BUG();
2915        }
2916
2917        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2918         * wrt the receiver, but it is not as straightforward as it may seem.
2919         * Various places in the resync start and stop logic assume resync
2920         * requests are processed in order, requeuing this on the worker thread
2921         * introduces a bunch of new code for synchronization between threads.
2922         *
2923         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2924         * "forever", throttling after drbd_rs_begin_io will lock that extent
2925         * for application writes for the same time.  For now, just throttle
2926         * here, where the rest of the code expects the receiver to sleep for
2927         * a while, anyways.
2928         */
2929
2930        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2931         * this defers syncer requests for some time, before letting at least
2932         * on request through.  The resync controller on the receiving side
2933         * will adapt to the incoming rate accordingly.
2934         *
2935         * We cannot throttle here if remote is Primary/SyncTarget:
2936         * we would also throttle its application reads.
2937         * In that case, throttling is done on the SyncTarget only.
2938         */
2939
2940        /* Even though this may be a resync request, we do add to "read_ee";
2941         * "sync_ee" is only used for resync WRITEs.
2942         * Add to list early, so debugfs can find this request
2943         * even if we have to sleep below. */
2944        spin_lock_irq(&device->resource->req_lock);
2945        list_add_tail(&peer_req->w.list, &device->read_ee);
2946        spin_unlock_irq(&device->resource->req_lock);
2947
2948        update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2949        if (device->state.peer != R_PRIMARY
2950        && drbd_rs_should_slow_down(device, sector, false))
2951                schedule_timeout_uninterruptible(HZ/10);
2952        update_receiver_timing_details(connection, drbd_rs_begin_io);
2953        if (drbd_rs_begin_io(device, sector))
2954                goto out_free_e;
2955
2956submit_for_resync:
2957        atomic_add(size >> 9, &device->rs_sect_ev);
2958
2959submit:
2960        update_receiver_timing_details(connection, drbd_submit_peer_request);
2961        inc_unacked(device);
2962        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2963                                     fault_type) == 0)
2964                return 0;
2965
2966        /* don't care for the reason here */
2967        drbd_err(device, "submit failed, triggering re-connect\n");
2968
2969out_free_e:
2970        spin_lock_irq(&device->resource->req_lock);
2971        list_del(&peer_req->w.list);
2972        spin_unlock_irq(&device->resource->req_lock);
2973        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2974
2975        put_ldev(device);
2976        drbd_free_peer_req(device, peer_req);
2977        return -EIO;
2978}
2979
2980/**
2981 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2982 */
2983static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2984{
2985        struct drbd_device *device = peer_device->device;
2986        int self, peer, rv = -100;
2987        unsigned long ch_self, ch_peer;
2988        enum drbd_after_sb_p after_sb_0p;
2989
2990        self = device->ldev->md.uuid[UI_BITMAP] & 1;
2991        peer = device->p_uuid[UI_BITMAP] & 1;
2992
2993        ch_peer = device->p_uuid[UI_SIZE];
2994        ch_self = device->comm_bm_set;
2995
2996        rcu_read_lock();
2997        after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2998        rcu_read_unlock();
2999        switch (after_sb_0p) {
3000        case ASB_CONSENSUS:
3001        case ASB_DISCARD_SECONDARY:
3002        case ASB_CALL_HELPER:
3003        case ASB_VIOLENTLY:
3004                drbd_err(device, "Configuration error.\n");
3005                break;
3006        case ASB_DISCONNECT:
3007                break;
3008        case ASB_DISCARD_YOUNGER_PRI:
3009                if (self == 0 && peer == 1) {
3010                        rv = -1;
3011                        break;
3012                }
3013                if (self == 1 && peer == 0) {
3014                        rv =  1;
3015                        break;
3016                }
3017                /* Else fall through to one of the other strategies... */
3018        case ASB_DISCARD_OLDER_PRI:
3019                if (self == 0 && peer == 1) {
3020                        rv = 1;
3021                        break;
3022                }
3023                if (self == 1 && peer == 0) {
3024                        rv = -1;
3025                        break;
3026                }
3027                /* Else fall through to one of the other strategies... */
3028                drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3029                     "Using discard-least-changes instead\n");
3030        case ASB_DISCARD_ZERO_CHG:
3031                if (ch_peer == 0 && ch_self == 0) {
3032                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3033                                ? -1 : 1;
3034                        break;
3035                } else {
3036                        if (ch_peer == 0) { rv =  1; break; }
3037                        if (ch_self == 0) { rv = -1; break; }
3038                }
3039                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3040                        break;
3041        case ASB_DISCARD_LEAST_CHG:
3042                if      (ch_self < ch_peer)
3043                        rv = -1;
3044                else if (ch_self > ch_peer)
3045                        rv =  1;
3046                else /* ( ch_self == ch_peer ) */
3047                     /* Well, then use something else. */
3048                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3049                                ? -1 : 1;
3050                break;
3051        case ASB_DISCARD_LOCAL:
3052                rv = -1;
3053                break;
3054        case ASB_DISCARD_REMOTE:
3055                rv =  1;
3056        }
3057
3058        return rv;
3059}
3060
3061/**
3062 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3063 */
3064static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3065{
3066        struct drbd_device *device = peer_device->device;
3067        int hg, rv = -100;
3068        enum drbd_after_sb_p after_sb_1p;
3069
3070        rcu_read_lock();
3071        after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3072        rcu_read_unlock();
3073        switch (after_sb_1p) {
3074        case ASB_DISCARD_YOUNGER_PRI:
3075        case ASB_DISCARD_OLDER_PRI:
3076        case ASB_DISCARD_LEAST_CHG:
3077        case ASB_DISCARD_LOCAL:
3078        case ASB_DISCARD_REMOTE:
3079        case ASB_DISCARD_ZERO_CHG:
3080                drbd_err(device, "Configuration error.\n");
3081                break;
3082        case ASB_DISCONNECT:
3083                break;
3084        case ASB_CONSENSUS:
3085                hg = drbd_asb_recover_0p(peer_device);
3086                if (hg == -1 && device->state.role == R_SECONDARY)
3087                        rv = hg;
3088                if (hg == 1  && device->state.role == R_PRIMARY)
3089                        rv = hg;
3090                break;
3091        case ASB_VIOLENTLY:
3092                rv = drbd_asb_recover_0p(peer_device);
3093                break;
3094        case ASB_DISCARD_SECONDARY:
3095                return device->state.role == R_PRIMARY ? 1 : -1;
3096        case ASB_CALL_HELPER:
3097                hg = drbd_asb_recover_0p(peer_device);
3098                if (hg == -1 && device->state.role == R_PRIMARY) {
3099                        enum drbd_state_rv rv2;
3100
3101                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3102                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3103                          * we do not need to wait for the after state change work either. */
3104                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3105                        if (rv2 != SS_SUCCESS) {
3106                                drbd_khelper(device, "pri-lost-after-sb");
3107                        } else {
3108                                drbd_warn(device, "Successfully gave up primary role.\n");
3109                                rv = hg;
3110                        }
3111                } else
3112                        rv = hg;
3113        }
3114
3115        return rv;
3116}
3117
3118/**
3119 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3120 */
3121static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3122{
3123        struct drbd_device *device = peer_device->device;
3124        int hg, rv = -100;
3125        enum drbd_after_sb_p after_sb_2p;
3126
3127        rcu_read_lock();
3128        after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3129        rcu_read_unlock();
3130        switch (after_sb_2p) {
3131        case ASB_DISCARD_YOUNGER_PRI:
3132        case ASB_DISCARD_OLDER_PRI:
3133        case ASB_DISCARD_LEAST_CHG:
3134        case ASB_DISCARD_LOCAL:
3135        case ASB_DISCARD_REMOTE:
3136        case ASB_CONSENSUS:
3137        case ASB_DISCARD_SECONDARY:
3138        case ASB_DISCARD_ZERO_CHG:
3139                drbd_err(device, "Configuration error.\n");
3140                break;
3141        case ASB_VIOLENTLY:
3142                rv = drbd_asb_recover_0p(peer_device);
3143                break;
3144        case ASB_DISCONNECT:
3145                break;
3146        case ASB_CALL_HELPER:
3147                hg = drbd_asb_recover_0p(peer_device);
3148                if (hg == -1) {
3149                        enum drbd_state_rv rv2;
3150
3151                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3152                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3153                          * we do not need to wait for the after state change work either. */
3154                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3155                        if (rv2 != SS_SUCCESS) {
3156                                drbd_khelper(device, "pri-lost-after-sb");
3157                        } else {
3158                                drbd_warn(device, "Successfully gave up primary role.\n");
3159                                rv = hg;
3160                        }
3161                } else
3162                        rv = hg;
3163        }
3164
3165        return rv;
3166}
3167
3168static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3169                           u64 bits, u64 flags)
3170{
3171        if (!uuid) {
3172                drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3173                return;
3174        }
3175        drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3176             text,
3177             (unsigned long long)uuid[UI_CURRENT],
3178             (unsigned long long)uuid[UI_BITMAP],
3179             (unsigned long long)uuid[UI_HISTORY_START],
3180             (unsigned long long)uuid[UI_HISTORY_END],
3181             (unsigned long long)bits,
3182             (unsigned long long)flags);
3183}
3184
3185/*
3186  100   after split brain try auto recover
3187    2   C_SYNC_SOURCE set BitMap
3188    1   C_SYNC_SOURCE use BitMap
3189    0   no Sync
3190   -1   C_SYNC_TARGET use BitMap
3191   -2   C_SYNC_TARGET set BitMap
3192 -100   after split brain, disconnect
3193-1000   unrelated data
3194-1091   requires proto 91
3195-1096   requires proto 96
3196 */
3197
3198static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3199{
3200        struct drbd_peer_device *const peer_device = first_peer_device(device);
3201        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3202        u64 self, peer;
3203        int i, j;
3204
3205        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3206        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207
3208        *rule_nr = 10;
3209        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3210                return 0;
3211
3212        *rule_nr = 20;
3213        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3214             peer != UUID_JUST_CREATED)
3215                return -2;
3216
3217        *rule_nr = 30;
3218        if (self != UUID_JUST_CREATED &&
3219            (peer == UUID_JUST_CREATED || peer == (u64)0))
3220                return 2;
3221
3222        if (self == peer) {
3223                int rct, dc; /* roles at crash time */
3224
3225                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3226
3227                        if (connection->agreed_pro_version < 91)
3228                                return -1091;
3229
3230                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3231                            (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3232                                drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3233                                drbd_uuid_move_history(device);
3234                                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3235                                device->ldev->md.uuid[UI_BITMAP] = 0;
3236
3237                                drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3238                                               device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3239                                *rule_nr = 34;
3240                        } else {
3241                                drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3242                                *rule_nr = 36;
3243                        }
3244
3245                        return 1;
3246                }
3247
3248                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3249
3250                        if (connection->agreed_pro_version < 91)
3251                                return -1091;
3252
3253                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3254                            (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3255                                drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3256
3257                                device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3258                                device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3259                                device->p_uuid[UI_BITMAP] = 0UL;
3260
3261                                drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3262                                *rule_nr = 35;
3263                        } else {
3264                                drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3265                                *rule_nr = 37;
3266                        }
3267
3268                        return -1;
3269                }
3270
3271                /* Common power [off|failure] */
3272                rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3273                        (device->p_uuid[UI_FLAGS] & 2);
3274                /* lowest bit is set when we were primary,
3275                 * next bit (weight 2) is set when peer was primary */
3276                *rule_nr = 40;
3277
3278                /* Neither has the "crashed primary" flag set,
3279                 * only a replication link hickup. */
3280                if (rct == 0)
3281                        return 0;
3282
3283                /* Current UUID equal and no bitmap uuid; does not necessarily
3284                 * mean this was a "simultaneous hard crash", maybe IO was
3285                 * frozen, so no UUID-bump happened.
3286                 * This is a protocol change, overload DRBD_FF_WSAME as flag
3287                 * for "new-enough" peer DRBD version. */
3288                if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3289                        *rule_nr = 41;
3290                        if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3291                                drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3292                                return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3293                        }
3294                        if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3295                                /* At least one has the "crashed primary" bit set,
3296                                 * both are primary now, but neither has rotated its UUIDs?
3297                                 * "Can not happen." */
3298                                drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3299                                return -100;
3300                        }
3301                        if (device->state.role == R_PRIMARY)
3302                                return 1;
3303                        return -1;
3304                }
3305
3306                /* Both are secondary.
3307                 * Really looks like recovery from simultaneous hard crash.
3308                 * Check which had been primary before, and arbitrate. */
3309                switch (rct) {
3310                case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3311                case 1: /*  self_pri && !peer_pri */ return 1;
3312                case 2: /* !self_pri &&  peer_pri */ return -1;
3313                case 3: /*  self_pri &&  peer_pri */
3314                        dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3315                        return dc ? -1 : 1;
3316                }
3317        }
3318
3319        *rule_nr = 50;
3320        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3321        if (self == peer)
3322                return -1;
3323
3324        *rule_nr = 51;
3325        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3326        if (self == peer) {
3327                if (connection->agreed_pro_version < 96 ?
3328                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3329                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3330                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3331                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3332                           resync as sync source modifications of the peer's UUIDs. */
3333
3334                        if (connection->agreed_pro_version < 91)
3335                                return -1091;
3336
3337                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3338                        device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3339
3340                        drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3341                        drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3342
3343                        return -1;
3344                }
3345        }
3346
3347        *rule_nr = 60;
3348        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3349        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3350                peer = device->p_uuid[i] & ~((u64)1);
3351                if (self == peer)
3352                        return -2;
3353        }
3354
3355        *rule_nr = 70;
3356        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3357        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3358        if (self == peer)
3359                return 1;
3360
3361        *rule_nr = 71;
3362        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3363        if (self == peer) {
3364                if (connection->agreed_pro_version < 96 ?
3365                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3366                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3367                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3368                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3369                           resync as sync source modifications of our UUIDs. */
3370
3371                        if (connection->agreed_pro_version < 91)
3372                                return -1091;
3373
3374                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3375                        __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3376
3377                        drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3378                        drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3379                                       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3380
3381                        return 1;
3382                }
3383        }
3384
3385
3386        *rule_nr = 80;
3387        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3388        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3389                self = device->ldev->md.uuid[i] & ~((u64)1);
3390                if (self == peer)
3391                        return 2;
3392        }
3393
3394        *rule_nr = 90;
3395        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3396        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3397        if (self == peer && self != ((u64)0))
3398                return 100;
3399
3400        *rule_nr = 100;
3401        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3402                self = device->ldev->md.uuid[i] & ~((u64)1);
3403                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3404                        peer = device->p_uuid[j] & ~((u64)1);
3405                        if (self == peer)
3406                                return -100;
3407                }
3408        }
3409
3410        return -1000;
3411}
3412
3413/* drbd_sync_handshake() returns the new conn state on success, or
3414   CONN_MASK (-1) on failure.
3415 */
3416static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3417                                           enum drbd_role peer_role,
3418                                           enum drbd_disk_state peer_disk) __must_hold(local)
3419{
3420        struct drbd_device *device = peer_device->device;
3421        enum drbd_conns rv = C_MASK;
3422        enum drbd_disk_state mydisk;
3423        struct net_conf *nc;
3424        int hg, rule_nr, rr_conflict, tentative;
3425
3426        mydisk = device->state.disk;
3427        if (mydisk == D_NEGOTIATING)
3428                mydisk = device->new_state_tmp.disk;
3429
3430        drbd_info(device, "drbd_sync_handshake:\n");
3431
3432        spin_lock_irq(&device->ldev->md.uuid_lock);
3433        drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3434        drbd_uuid_dump(device, "peer", device->p_uuid,
3435                       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3436
3437        hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3438        spin_unlock_irq(&device->ldev->md.uuid_lock);
3439
3440        drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3441
3442        if (hg == -1000) {
3443                drbd_alert(device, "Unrelated data, aborting!\n");
3444                return C_MASK;
3445        }
3446        if (hg < -0x10000) {
3447                int proto, fflags;
3448                hg = -hg;
3449                proto = hg & 0xff;
3450                fflags = (hg >> 8) & 0xff;
3451                drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3452                                        proto, fflags);
3453                return C_MASK;
3454        }
3455        if (hg < -1000) {
3456                drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3457                return C_MASK;
3458        }
3459
3460        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3461            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3462                int f = (hg == -100) || abs(hg) == 2;
3463                hg = mydisk > D_INCONSISTENT ? 1 : -1;
3464                if (f)
3465                        hg = hg*2;
3466                drbd_info(device, "Becoming sync %s due to disk states.\n",
3467                     hg > 0 ? "source" : "target");
3468        }
3469
3470        if (abs(hg) == 100)
3471                drbd_khelper(device, "initial-split-brain");
3472
3473        rcu_read_lock();
3474        nc = rcu_dereference(peer_device->connection->net_conf);
3475
3476        if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3477                int pcount = (device->state.role == R_PRIMARY)
3478                           + (peer_role == R_PRIMARY);
3479                int forced = (hg == -100);
3480
3481                switch (pcount) {
3482                case 0:
3483                        hg = drbd_asb_recover_0p(peer_device);
3484                        break;
3485                case 1:
3486                        hg = drbd_asb_recover_1p(peer_device);
3487                        break;
3488                case 2:
3489                        hg = drbd_asb_recover_2p(peer_device);
3490                        break;
3491                }
3492                if (abs(hg) < 100) {
3493                        drbd_warn(device, "Split-Brain detected, %d primaries, "
3494                             "automatically solved. Sync from %s node\n",
3495                             pcount, (hg < 0) ? "peer" : "this");
3496                        if (forced) {
3497                                drbd_warn(device, "Doing a full sync, since"
3498                                     " UUIDs where ambiguous.\n");
3499                                hg = hg*2;
3500                        }
3501                }
3502        }
3503
3504        if (hg == -100) {
3505                if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3506                        hg = -1;
3507                if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3508                        hg = 1;
3509
3510                if (abs(hg) < 100)
3511                        drbd_warn(device, "Split-Brain detected, manually solved. "
3512                             "Sync from %s node\n",
3513                             (hg < 0) ? "peer" : "this");
3514        }
3515        rr_conflict = nc->rr_conflict;
3516        tentative = nc->tentative;
3517        rcu_read_unlock();
3518
3519        if (hg == -100) {
3520                /* FIXME this log message is not correct if we end up here
3521                 * after an attempted attach on a diskless node.
3522                 * We just refuse to attach -- well, we drop the "connection"
3523                 * to that disk, in a way... */
3524                drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3525                drbd_khelper(device, "split-brain");
3526                return C_MASK;
3527        }
3528
3529        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3530                drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3531                return C_MASK;
3532        }
3533
3534        if (hg < 0 && /* by intention we do not use mydisk here. */
3535            device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3536                switch (rr_conflict) {
3537                case ASB_CALL_HELPER:
3538                        drbd_khelper(device, "pri-lost");
3539                        /* fall through */
3540                case ASB_DISCONNECT:
3541                        drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3542                        return C_MASK;
3543                case ASB_VIOLENTLY:
3544                        drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3545                             "assumption\n");
3546                }
3547        }
3548
3549        if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3550                if (hg == 0)
3551                        drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3552                else
3553                        drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3554                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3555                                 abs(hg) >= 2 ? "full" : "bit-map based");
3556                return C_MASK;
3557        }
3558
3559        if (abs(hg) >= 2) {
3560                drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3561                if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3562                                        BM_LOCKED_SET_ALLOWED))
3563                        return C_MASK;
3564        }
3565
3566        if (hg > 0) { /* become sync source. */
3567                rv = C_WF_BITMAP_S;
3568        } else if (hg < 0) { /* become sync target */
3569                rv = C_WF_BITMAP_T;
3570        } else {
3571                rv = C_CONNECTED;
3572                if (drbd_bm_total_weight(device)) {
3573                        drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3574                             drbd_bm_total_weight(device));
3575                }
3576        }
3577
3578        return rv;
3579}
3580
3581static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3582{
3583        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3584        if (peer == ASB_DISCARD_REMOTE)
3585                return ASB_DISCARD_LOCAL;
3586
3587        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3588        if (peer == ASB_DISCARD_LOCAL)
3589                return ASB_DISCARD_REMOTE;
3590
3591        /* everything else is valid if they are equal on both sides. */
3592        return peer;
3593}
3594
3595static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3596{
3597        struct p_protocol *p = pi->data;
3598        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3599        int p_proto, p_discard_my_data, p_two_primaries, cf;
3600        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3601        char integrity_alg[SHARED_SECRET_MAX] = "";
3602        struct crypto_ahash *peer_integrity_tfm = NULL;
3603        void *int_dig_in = NULL, *int_dig_vv = NULL;
3604
3605        p_proto         = be32_to_cpu(p->protocol);
3606        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3607        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3608        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3609        p_two_primaries = be32_to_cpu(p->two_primaries);
3610        cf              = be32_to_cpu(p->conn_flags);
3611        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3612
3613        if (connection->agreed_pro_version >= 87) {
3614                int err;
3615
3616                if (pi->size > sizeof(integrity_alg))
3617                        return -EIO;
3618                err = drbd_recv_all(connection, integrity_alg, pi->size);
3619                if (err)
3620                        return err;
3621                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3622        }
3623
3624        if (pi->cmd != P_PROTOCOL_UPDATE) {
3625                clear_bit(CONN_DRY_RUN, &connection->flags);
3626
3627                if (cf & CF_DRY_RUN)
3628                        set_bit(CONN_DRY_RUN, &connection->flags);
3629
3630                rcu_read_lock();
3631                nc = rcu_dereference(connection->net_conf);
3632
3633                if (p_proto != nc->wire_protocol) {
3634                        drbd_err(connection, "incompatible %s settings\n", "protocol");
3635                        goto disconnect_rcu_unlock;
3636                }
3637
3638                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3639                        drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3640                        goto disconnect_rcu_unlock;
3641                }
3642
3643                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3644                        drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3645                        goto disconnect_rcu_unlock;
3646                }
3647
3648                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3649                        drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3650                        goto disconnect_rcu_unlock;
3651                }
3652
3653                if (p_discard_my_data && nc->discard_my_data) {
3654                        drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3655                        goto disconnect_rcu_unlock;
3656                }
3657
3658                if (p_two_primaries != nc->two_primaries) {
3659                        drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3660                        goto disconnect_rcu_unlock;
3661                }
3662
3663                if (strcmp(integrity_alg, nc->integrity_alg)) {
3664                        drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3665                        goto disconnect_rcu_unlock;
3666                }
3667
3668                rcu_read_unlock();
3669        }
3670
3671        if (integrity_alg[0]) {
3672                int hash_size;
3673
3674                /*
3675                 * We can only change the peer data integrity algorithm
3676                 * here.  Changing our own data integrity algorithm
3677                 * requires that we send a P_PROTOCOL_UPDATE packet at
3678                 * the same time; otherwise, the peer has no way to
3679                 * tell between which packets the algorithm should
3680                 * change.
3681                 */
3682
3683                peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3684                if (IS_ERR(peer_integrity_tfm)) {
3685                        peer_integrity_tfm = NULL;
3686                        drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3687                                 integrity_alg);
3688                        goto disconnect;
3689                }
3690
3691                hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3692                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3693                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3694                if (!(int_dig_in && int_dig_vv)) {
3695                        drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3696                        goto disconnect;
3697                }
3698        }
3699
3700        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3701        if (!new_net_conf) {
3702                drbd_err(connection, "Allocation of new net_conf failed\n");
3703                goto disconnect;
3704        }
3705
3706        mutex_lock(&connection->data.mutex);
3707        mutex_lock(&connection->resource->conf_update);
3708        old_net_conf = connection->net_conf;
3709        *new_net_conf = *old_net_conf;
3710
3711        new_net_conf->wire_protocol = p_proto;
3712        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3713        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3714        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3715        new_net_conf->two_primaries = p_two_primaries;
3716
3717        rcu_assign_pointer(connection->net_conf, new_net_conf);
3718        mutex_unlock(&connection->resource->conf_update);
3719        mutex_unlock(&connection->data.mutex);
3720
3721        crypto_free_ahash(connection->peer_integrity_tfm);
3722        kfree(connection->int_dig_in);
3723        kfree(connection->int_dig_vv);
3724        connection->peer_integrity_tfm = peer_integrity_tfm;
3725        connection->int_dig_in = int_dig_in;
3726        connection->int_dig_vv = int_dig_vv;
3727
3728        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3729                drbd_info(connection, "peer data-integrity-alg: %s\n",
3730                          integrity_alg[0] ? integrity_alg : "(none)");
3731
3732        synchronize_rcu();
3733        kfree(old_net_conf);
3734        return 0;
3735
3736disconnect_rcu_unlock:
3737        rcu_read_unlock();
3738disconnect:
3739        crypto_free_ahash(peer_integrity_tfm);
3740        kfree(int_dig_in);
3741        kfree(int_dig_vv);
3742        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3743        return -EIO;
3744}
3745
3746/* helper function
3747 * input: alg name, feature name
3748 * return: NULL (alg name was "")
3749 *         ERR_PTR(error) if something goes wrong
3750 *         or the crypto hash ptr, if it worked out ok. */
3751static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3752                const char *alg, const char *name)
3753{
3754        struct crypto_ahash *tfm;
3755
3756        if (!alg[0])
3757                return NULL;
3758
3759        tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3760        if (IS_ERR(tfm)) {
3761                drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3762                        alg, name, PTR_ERR(tfm));
3763                return tfm;
3764        }
3765        return tfm;
3766}
3767
3768static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3769{
3770        void *buffer = connection->data.rbuf;
3771        int size = pi->size;
3772
3773        while (size) {
3774                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3775                s = drbd_recv(connection, buffer, s);
3776                if (s <= 0) {
3777                        if (s < 0)
3778                                return s;
3779                        break;
3780                }
3781                size -= s;
3782        }
3783        if (size)
3784                return -EIO;
3785        return 0;
3786}
3787
3788/*
3789 * config_unknown_volume  -  device configuration command for unknown volume
3790 *
3791 * When a device is added to an existing connection, the node on which the
3792 * device is added first will send configuration commands to its peer but the
3793 * peer will not know about the device yet.  It will warn and ignore these
3794 * commands.  Once the device is added on the second node, the second node will
3795 * send the same device configuration commands, but in the other direction.
3796 *
3797 * (We can also end up here if drbd is misconfigured.)
3798 */
3799static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3800{
3801        drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3802                  cmdname(pi->cmd), pi->vnr);
3803        return ignore_remaining_packet(connection, pi);
3804}
3805
3806static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3807{
3808        struct drbd_peer_device *peer_device;
3809        struct drbd_device *device;
3810        struct p_rs_param_95 *p;
3811        unsigned int header_size, data_size, exp_max_sz;
3812        struct crypto_ahash *verify_tfm = NULL;
3813        struct crypto_ahash *csums_tfm = NULL;
3814        struct net_conf *old_net_conf, *new_net_conf = NULL;
3815        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3816        const int apv = connection->agreed_pro_version;
3817        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3818        int fifo_size = 0;
3819        int err;
3820
3821        peer_device = conn_peer_device(connection, pi->vnr);
3822        if (!peer_device)
3823                return config_unknown_volume(connection, pi);
3824        device = peer_device->device;
3825
3826        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3827                    : apv == 88 ? sizeof(struct p_rs_param)
3828                                        + SHARED_SECRET_MAX
3829                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3830                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3831
3832        if (pi->size > exp_max_sz) {
3833                drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3834                    pi->size, exp_max_sz);
3835                return -EIO;
3836        }
3837
3838        if (apv <= 88) {
3839                header_size = sizeof(struct p_rs_param);
3840                data_size = pi->size - header_size;
3841        } else if (apv <= 94) {
3842                header_size = sizeof(struct p_rs_param_89);
3843                data_size = pi->size - header_size;
3844                D_ASSERT(device, data_size == 0);
3845        } else {
3846                header_size = sizeof(struct p_rs_param_95);
3847                data_size = pi->size - header_size;
3848                D_ASSERT(device, data_size == 0);
3849        }
3850
3851        /* initialize verify_alg and csums_alg */
3852        p = pi->data;
3853        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3854
3855        err = drbd_recv_all(peer_device->connection, p, header_size);
3856        if (err)
3857                return err;
3858
3859        mutex_lock(&connection->resource->conf_update);
3860        old_net_conf = peer_device->connection->net_conf;
3861        if (get_ldev(device)) {
3862                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3863                if (!new_disk_conf) {
3864                        put_ldev(device);
3865                        mutex_unlock(&connection->resource->conf_update);
3866                        drbd_err(device, "Allocation of new disk_conf failed\n");
3867                        return -ENOMEM;
3868                }
3869
3870                old_disk_conf = device->ldev->disk_conf;
3871                *new_disk_conf = *old_disk_conf;
3872
3873                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3874        }
3875
3876        if (apv >= 88) {
3877                if (apv == 88) {
3878                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3879                                drbd_err(device, "verify-alg of wrong size, "
3880                                        "peer wants %u, accepting only up to %u byte\n",
3881                                        data_size, SHARED_SECRET_MAX);
3882                                err = -EIO;
3883                                goto reconnect;
3884                        }
3885
3886                        err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3887                        if (err)
3888                                goto reconnect;
3889                        /* we expect NUL terminated string */
3890                        /* but just in case someone tries to be evil */
3891                        D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3892                        p->verify_alg[data_size-1] = 0;
3893
3894                } else /* apv >= 89 */ {
3895                        /* we still expect NUL terminated strings */
3896                        /* but just in case someone tries to be evil */
3897                        D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3898                        D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3899                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3900                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3901                }
3902
3903                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3904                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3905                                drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3906                                    old_net_conf->verify_alg, p->verify_alg);
3907                                goto disconnect;
3908                        }
3909                        verify_tfm = drbd_crypto_alloc_digest_safe(device,
3910                                        p->verify_alg, "verify-alg");
3911                        if (IS_ERR(verify_tfm)) {
3912                                verify_tfm = NULL;
3913                                goto disconnect;
3914                        }
3915                }
3916
3917                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3918                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3919                                drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3920                                    old_net_conf->csums_alg, p->csums_alg);
3921                                goto disconnect;
3922                        }
3923                        csums_tfm = drbd_crypto_alloc_digest_safe(device,
3924                                        p->csums_alg, "csums-alg");
3925                        if (IS_ERR(csums_tfm)) {
3926                                csums_tfm = NULL;
3927                                goto disconnect;
3928                        }
3929                }
3930
3931                if (apv > 94 && new_disk_conf) {
3932                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3933                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3934                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3935                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3936
3937                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3938                        if (fifo_size != device->rs_plan_s->size) {
3939                                new_plan = fifo_alloc(fifo_size);
3940                                if (!new_plan) {
3941                                        drbd_err(device, "kmalloc of fifo_buffer failed");
3942                                        put_ldev(device);
3943                                        goto disconnect;
3944                                }
3945                        }
3946                }
3947
3948                if (verify_tfm || csums_tfm) {
3949                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3950                        if (!new_net_conf) {
3951                                drbd_err(device, "Allocation of new net_conf failed\n");
3952                                goto disconnect;
3953                        }
3954
3955                        *new_net_conf = *old_net_conf;
3956
3957                        if (verify_tfm) {
3958                                strcpy(new_net_conf->verify_alg, p->verify_alg);
3959                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3960                                crypto_free_ahash(peer_device->connection->verify_tfm);
3961                                peer_device->connection->verify_tfm = verify_tfm;
3962                                drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3963                        }
3964                        if (csums_tfm) {
3965                                strcpy(new_net_conf->csums_alg, p->csums_alg);
3966                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3967                                crypto_free_ahash(peer_device->connection->csums_tfm);
3968                                peer_device->connection->csums_tfm = csums_tfm;
3969                                drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3970                        }
3971                        rcu_assign_pointer(connection->net_conf, new_net_conf);
3972                }
3973        }
3974
3975        if (new_disk_conf) {
3976                rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3977                put_ldev(device);
3978        }
3979
3980        if (new_plan) {
3981                old_plan = device->rs_plan_s;
3982                rcu_assign_pointer(device->rs_plan_s, new_plan);
3983        }
3984
3985        mutex_unlock(&connection->resource->conf_update);
3986        synchronize_rcu();
3987        if (new_net_conf)
3988                kfree(old_net_conf);
3989        kfree(old_disk_conf);
3990        kfree(old_plan);
3991
3992        return 0;
3993
3994reconnect:
3995        if (new_disk_conf) {
3996                put_ldev(device);
3997                kfree(new_disk_conf);
3998        }
3999        mutex_unlock(&connection->resource->conf_update);
4000        return -EIO;
4001
4002disconnect:
4003        kfree(new_plan);
4004        if (new_disk_conf) {
4005                put_ldev(device);
4006                kfree(new_disk_conf);
4007        }
4008        mutex_unlock(&connection->resource->conf_update);
4009        /* just for completeness: actually not needed,
4010         * as this is not reached if csums_tfm was ok. */
4011        crypto_free_ahash(csums_tfm);
4012        /* but free the verify_tfm again, if csums_tfm did not work out */
4013        crypto_free_ahash(verify_tfm);
4014        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4015        return -EIO;
4016}
4017
4018/* warn if the arguments differ by more than 12.5% */
4019static void warn_if_differ_considerably(struct drbd_device *device,
4020        const char *s, sector_t a, sector_t b)
4021{
4022        sector_t d;
4023        if (a == 0 || b == 0)
4024                return;
4025        d = (a > b) ? (a - b) : (b - a);
4026        if (d > (a>>3) || d > (b>>3))
4027                drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4028                     (unsigned long long)a, (unsigned long long)b);
4029}
4030
4031static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4032{
4033        struct drbd_peer_device *peer_device;
4034        struct drbd_device *device;
4035        struct p_sizes *p = pi->data;
4036        struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4037        enum determine_dev_size dd = DS_UNCHANGED;
4038        sector_t p_size, p_usize, p_csize, my_usize;
4039        int ldsc = 0; /* local disk size changed */
4040        enum dds_flags ddsf;
4041
4042        peer_device = conn_peer_device(connection, pi->vnr);
4043        if (!peer_device)
4044                return config_unknown_volume(connection, pi);
4045        device = peer_device->device;
4046
4047        p_size = be64_to_cpu(p->d_size);
4048        p_usize = be64_to_cpu(p->u_size);
4049        p_csize = be64_to_cpu(p->c_size);
4050
4051        /* just store the peer's disk size for now.
4052         * we still need to figure out whether we accept that. */
4053        device->p_size = p_size;
4054
4055        if (get_ldev(device)) {
4056                sector_t new_size, cur_size;
4057                rcu_read_lock();
4058                my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4059                rcu_read_unlock();
4060
4061                warn_if_differ_considerably(device, "lower level device sizes",
4062                           p_size, drbd_get_max_capacity(device->ldev));
4063                warn_if_differ_considerably(device, "user requested size",
4064                                            p_usize, my_usize);
4065
4066                /* if this is the first connect, or an otherwise expected
4067                 * param exchange, choose the minimum */
4068                if (device->state.conn == C_WF_REPORT_PARAMS)
4069                        p_usize = min_not_zero(my_usize, p_usize);
4070
4071                /* Never shrink a device with usable data during connect.
4072                   But allow online shrinking if we are connected. */
4073                new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4074                cur_size = drbd_get_capacity(device->this_bdev);
4075                if (new_size < cur_size &&
4076                    device->state.disk >= D_OUTDATED &&
4077                    device->state.conn < C_CONNECTED) {
4078                        drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4079                                        (unsigned long long)new_size, (unsigned long long)cur_size);
4080                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4081                        put_ldev(device);
4082                        return -EIO;
4083                }
4084
4085                if (my_usize != p_usize) {
4086                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4087
4088                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4089                        if (!new_disk_conf) {
4090                                drbd_err(device, "Allocation of new disk_conf failed\n");
4091                                put_ldev(device);
4092                                return -ENOMEM;
4093                        }
4094
4095                        mutex_lock(&connection->resource->conf_update);
4096                        old_disk_conf = device->ldev->disk_conf;
4097                        *new_disk_conf = *old_disk_conf;
4098                        new_disk_conf->disk_size = p_usize;
4099
4100                        rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4101                        mutex_unlock(&connection->resource->conf_update);
4102                        synchronize_rcu();
4103                        kfree(old_disk_conf);
4104
4105                        drbd_info(device, "Peer sets u_size to %lu sectors\n",
4106                                 (unsigned long)my_usize);
4107                }
4108
4109                put_ldev(device);
4110        }
4111
4112        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4113        /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4114           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4115           drbd_reconsider_queue_parameters(), we can be sure that after
4116           drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4117
4118        ddsf = be16_to_cpu(p->dds_flags);
4119        if (get_ldev(device)) {
4120                drbd_reconsider_queue_parameters(device, device->ldev, o);
4121                dd = drbd_determine_dev_size(device, ddsf, NULL);
4122                put_ldev(device);
4123                if (dd == DS_ERROR)
4124                        return -EIO;
4125                drbd_md_sync(device);
4126        } else {
4127                /*
4128                 * I am diskless, need to accept the peer's *current* size.
4129                 * I must NOT accept the peers backing disk size,
4130                 * it may have been larger than mine all along...
4131                 *
4132                 * At this point, the peer knows more about my disk, or at
4133                 * least about what we last agreed upon, than myself.
4134                 * So if his c_size is less than his d_size, the most likely
4135                 * reason is that *my* d_size was smaller last time we checked.
4136                 *
4137                 * However, if he sends a zero current size,
4138                 * take his (user-capped or) backing disk size anyways.
4139                 */
4140                drbd_reconsider_queue_parameters(device, NULL, o);
4141                drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4142        }
4143
4144        if (get_ldev(device)) {
4145                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4146                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4147                        ldsc = 1;
4148                }
4149
4150                put_ldev(device);
4151        }
4152
4153        if (device->state.conn > C_WF_REPORT_PARAMS) {
4154                if (be64_to_cpu(p->c_size) !=
4155                    drbd_get_capacity(device->this_bdev) || ldsc) {
4156                        /* we have different sizes, probably peer
4157                         * needs to know my new size... */
4158                        drbd_send_sizes(peer_device, 0, ddsf);
4159                }
4160                if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4161                    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4162                        if (device->state.pdsk >= D_INCONSISTENT &&
4163                            device->state.disk >= D_INCONSISTENT) {
4164                                if (ddsf & DDSF_NO_RESYNC)
4165                                        drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4166                                else
4167                                        resync_after_online_grow(device);
4168                        } else
4169                                set_bit(RESYNC_AFTER_NEG, &device->flags);
4170                }
4171        }
4172
4173        return 0;
4174}
4175
4176static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4177{
4178        struct drbd_peer_device *peer_device;
4179        struct drbd_device *device;
4180        struct p_uuids *p = pi->data;
4181        u64 *p_uuid;
4182        int i, updated_uuids = 0;
4183
4184        peer_device = conn_peer_device(connection, pi->vnr);
4185        if (!peer_device)
4186                return config_unknown_volume(connection, pi);
4187        device = peer_device->device;
4188
4189        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4190        if (!p_uuid) {
4191                drbd_err(device, "kmalloc of p_uuid failed\n");
4192                return false;
4193        }
4194
4195        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4196                p_uuid[i] = be64_to_cpu(p->uuid[i]);
4197
4198        kfree(device->p_uuid);
4199        device->p_uuid = p_uuid;
4200
4201        if (device->state.conn < C_CONNECTED &&
4202            device->state.disk < D_INCONSISTENT &&
4203            device->state.role == R_PRIMARY &&
4204            (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4205                drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4206                    (unsigned long long)device->ed_uuid);
4207                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4208                return -EIO;
4209        }
4210
4211        if (get_ldev(device)) {
4212                int skip_initial_sync =
4213                        device->state.conn == C_CONNECTED &&
4214                        peer_device->connection->agreed_pro_version >= 90 &&
4215                        device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4216                        (p_uuid[UI_FLAGS] & 8);
4217                if (skip_initial_sync) {
4218                        drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4219                        drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4220                                        "clear_n_write from receive_uuids",
4221                                        BM_LOCKED_TEST_ALLOWED);
4222                        _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4223                        _drbd_uuid_set(device, UI_BITMAP, 0);
4224                        _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4225                                        CS_VERBOSE, NULL);
4226                        drbd_md_sync(device);
4227                        updated_uuids = 1;
4228                }
4229                put_ldev(device);
4230        } else if (device->state.disk < D_INCONSISTENT &&
4231                   device->state.role == R_PRIMARY) {
4232                /* I am a diskless primary, the peer just created a new current UUID
4233                   for me. */
4234                updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4235        }
4236
4237        /* Before we test for the disk state, we should wait until an eventually
4238           ongoing cluster wide state change is finished. That is important if
4239           we are primary and are detaching from our disk. We need to see the
4240           new disk state... */
4241        mutex_lock(device->state_mutex);
4242        mutex_unlock(device->state_mutex);
4243        if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4244                updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4245
4246        if (updated_uuids)
4247                drbd_print_uuids(device, "receiver updated UUIDs to");
4248
4249        return 0;
4250}
4251
4252/**
4253 * convert_state() - Converts the peer's view of the cluster state to our point of view
4254 * @ps:         The state as seen by the peer.
4255 */
4256static union drbd_state convert_state(union drbd_state ps)
4257{
4258        union drbd_state ms;
4259
4260        static enum drbd_conns c_tab[] = {
4261                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4262                [C_CONNECTED] = C_CONNECTED,
4263
4264                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4265                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4266                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4267                [C_VERIFY_S]       = C_VERIFY_T,
4268                [C_MASK]   = C_MASK,
4269        };
4270
4271        ms.i = ps.i;
4272
4273        ms.conn = c_tab[ps.conn];
4274        ms.peer = ps.role;
4275        ms.role = ps.peer;
4276        ms.pdsk = ps.disk;
4277        ms.disk = ps.pdsk;
4278        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4279
4280        return ms;
4281}
4282
4283static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4284{
4285        struct drbd_peer_device *peer_device;
4286        struct drbd_device *device;
4287        struct p_req_state *p = pi->data;
4288        union drbd_state mask, val;
4289        enum drbd_state_rv rv;
4290
4291        peer_device = conn_peer_device(connection, pi->vnr);
4292        if (!peer_device)
4293                return -EIO;
4294        device = peer_device->device;
4295
4296        mask.i = be32_to_cpu(p->mask);
4297        val.i = be32_to_cpu(p->val);
4298
4299        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4300            mutex_is_locked(device->state_mutex)) {
4301                drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4302                return 0;
4303        }
4304
4305        mask = convert_state(mask);
4306        val = convert_state(val);
4307
4308        rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4309        drbd_send_sr_reply(peer_device, rv);
4310
4311        drbd_md_sync(device);
4312
4313        return 0;
4314}
4315
4316static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4317{
4318        struct p_req_state *p = pi->data;
4319        union drbd_state mask, val;
4320        enum drbd_state_rv rv;
4321
4322        mask.i = be32_to_cpu(p->mask);
4323        val.i = be32_to_cpu(p->val);
4324
4325        if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4326            mutex_is_locked(&connection->cstate_mutex)) {
4327                conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4328                return 0;
4329        }
4330
4331        mask = convert_state(mask);
4332        val = convert_state(val);
4333
4334        rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4335        conn_send_sr_reply(connection, rv);
4336
4337        return 0;
4338}
4339
4340static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4341{
4342        struct drbd_peer_device *peer_device;
4343        struct drbd_device *device;
4344        struct p_state *p = pi->data;
4345        union drbd_state os, ns, peer_state;
4346        enum drbd_disk_state real_peer_disk;
4347        enum chg_state_flags cs_flags;
4348        int rv;
4349
4350        peer_device = conn_peer_device(connection, pi->vnr);
4351        if (!peer_device)
4352                return config_unknown_volume(connection, pi);
4353        device = peer_device->device;
4354
4355        peer_state.i = be32_to_cpu(p->state);
4356
4357        real_peer_disk = peer_state.disk;
4358        if (peer_state.disk == D_NEGOTIATING) {
4359                real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4360                drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4361        }
4362
4363        spin_lock_irq(&device->resource->req_lock);
4364 retry:
4365        os = ns = drbd_read_state(device);
4366        spin_unlock_irq(&device->resource->req_lock);
4367
4368        /* If some other part of the code (ack_receiver thread, timeout)
4369         * already decided to close the connection again,
4370         * we must not "re-establish" it here. */
4371        if (os.conn <= C_TEAR_DOWN)
4372                return -ECONNRESET;
4373
4374        /* If this is the "end of sync" confirmation, usually the peer disk
4375         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4376         * set) resync started in PausedSyncT, or if the timing of pause-/
4377         * unpause-sync events has been "just right", the peer disk may
4378         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4379         */
4380        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4381            real_peer_disk == D_UP_TO_DATE &&
4382            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4383                /* If we are (becoming) SyncSource, but peer is still in sync
4384                 * preparation, ignore its uptodate-ness to avoid flapping, it
4385                 * will change to inconsistent once the peer reaches active
4386                 * syncing states.
4387                 * It may have changed syncer-paused flags, however, so we
4388                 * cannot ignore this completely. */
4389                if (peer_state.conn > C_CONNECTED &&
4390                    peer_state.conn < C_SYNC_SOURCE)
4391                        real_peer_disk = D_INCONSISTENT;
4392
4393                /* if peer_state changes to connected at the same time,
4394                 * it explicitly notifies us that it finished resync.
4395                 * Maybe we should finish it up, too? */
4396                else if (os.conn >= C_SYNC_SOURCE &&
4397                         peer_state.conn == C_CONNECTED) {
4398                        if (drbd_bm_total_weight(device) <= device->rs_failed)
4399                                drbd_resync_finished(device);
4400                        return 0;
4401                }
4402        }
4403
4404        /* explicit verify finished notification, stop sector reached. */
4405        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4406            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4407                ov_out_of_sync_print(device);
4408                drbd_resync_finished(device);
4409                return 0;
4410        }
4411
4412        /* peer says his disk is inconsistent, while we think it is uptodate,
4413         * and this happens while the peer still thinks we have a sync going on,
4414         * but we think we are already done with the sync.
4415         * We ignore this to avoid flapping pdsk.
4416         * This should not happen, if the peer is a recent version of drbd. */
4417        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4418            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4419                real_peer_disk = D_UP_TO_DATE;
4420
4421        if (ns.conn == C_WF_REPORT_PARAMS)
4422                ns.conn = C_CONNECTED;
4423
4424        if (peer_state.conn == C_AHEAD)
4425                ns.conn = C_BEHIND;
4426
4427        if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4428            get_ldev_if_state(device, D_NEGOTIATING)) {
4429                int cr; /* consider resync */
4430
4431                /* if we established a new connection */
4432                cr  = (os.conn < C_CONNECTED);
4433                /* if we had an established connection
4434                 * and one of the nodes newly attaches a disk */
4435                cr |= (os.conn == C_CONNECTED &&
4436                       (peer_state.disk == D_NEGOTIATING ||
4437                        os.disk == D_NEGOTIATING));
4438                /* if we have both been inconsistent, and the peer has been
4439                 * forced to be UpToDate with --overwrite-data */
4440                cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4441                /* if we had been plain connected, and the admin requested to
4442                 * start a sync by "invalidate" or "invalidate-remote" */
4443                cr |= (os.conn == C_CONNECTED &&
4444                                (peer_state.conn >= C_STARTING_SYNC_S &&
4445                                 peer_state.conn <= C_WF_BITMAP_T));
4446
4447                if (cr)
4448                        ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4449
4450                put_ldev(device);
4451                if (ns.conn == C_MASK) {
4452                        ns.conn = C_CONNECTED;
4453                        if (device->state.disk == D_NEGOTIATING) {
4454                                drbd_force_state(device, NS(disk, D_FAILED));
4455                        } else if (peer_state.disk == D_NEGOTIATING) {
4456                                drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4457                                peer_state.disk = D_DISKLESS;
4458                                real_peer_disk = D_DISKLESS;
4459                        } else {
4460                                if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4461                                        return -EIO;
4462                                D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4463                                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4464                                return -EIO;
4465                        }
4466                }
4467        }
4468
4469        spin_lock_irq(&device->resource->req_lock);
4470        if (os.i != drbd_read_state(device).i)
4471                goto retry;
4472        clear_bit(CONSIDER_RESYNC, &device->flags);
4473        ns.peer = peer_state.role;
4474        ns.pdsk = real_peer_disk;
4475        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4476        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4477                ns.disk = device->new_state_tmp.disk;
4478        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4479        if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4480            test_bit(NEW_CUR_UUID, &device->flags)) {
4481                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4482                   for temporal network outages! */
4483                spin_unlock_irq(&device->resource->req_lock);
4484                drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4485                tl_clear(peer_device->connection);
4486                drbd_uuid_new_current(device);
4487                clear_bit(NEW_CUR_UUID, &device->flags);
4488                conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4489                return -EIO;
4490        }
4491        rv = _drbd_set_state(device, ns, cs_flags, NULL);
4492        ns = drbd_read_state(device);
4493        spin_unlock_irq(&device->resource->req_lock);
4494
4495        if (rv < SS_SUCCESS) {
4496                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4497                return -EIO;
4498        }
4499
4500        if (os.conn > C_WF_REPORT_PARAMS) {
4501                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4502                    peer_state.disk != D_NEGOTIATING ) {
4503                        /* we want resync, peer has not yet decided to sync... */
4504                        /* Nowadays only used when forcing a node into primary role and
4505                           setting its disk to UpToDate with that */
4506                        drbd_send_uuids(peer_device);
4507                        drbd_send_current_state(peer_device);
4508                }
4509        }
4510
4511        clear_bit(DISCARD_MY_DATA, &device->flags);
4512
4513        drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4514
4515        return 0;
4516}
4517
4518static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4519{
4520        struct drbd_peer_device *peer_device;
4521        struct drbd_device *device;
4522        struct p_rs_uuid *p = pi->data;
4523
4524        peer_device = conn_peer_device(connection, pi->vnr);
4525        if (!peer_device)
4526                return -EIO;
4527        device = peer_device->device;
4528
4529        wait_event(device->misc_wait,
4530                   device->state.conn == C_WF_SYNC_UUID ||
4531                   device->state.conn == C_BEHIND ||
4532                   device->state.conn < C_CONNECTED ||
4533                   device->state.disk < D_NEGOTIATING);
4534
4535        /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4536
4537        /* Here the _drbd_uuid_ functions are right, current should
4538           _not_ be rotated into the history */
4539        if (get_ldev_if_state(device, D_NEGOTIATING)) {
4540                _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4541                _drbd_uuid_set(device, UI_BITMAP, 0UL);
4542
4543                drbd_print_uuids(device, "updated sync uuid");
4544                drbd_start_resync(device, C_SYNC_TARGET);
4545
4546                put_ldev(device);
4547        } else
4548                drbd_err(device, "Ignoring SyncUUID packet!\n");
4549
4550        return 0;
4551}
4552
4553/**
4554 * receive_bitmap_plain
4555 *
4556 * Return 0 when done, 1 when another iteration is needed, and a negative error
4557 * code upon failure.
4558 */
4559static int
4560receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4561                     unsigned long *p, struct bm_xfer_ctx *c)
4562{
4563        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4564                                 drbd_header_size(peer_device->connection);
4565        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4566                                       c->bm_words - c->word_offset);
4567        unsigned int want = num_words * sizeof(*p);
4568        int err;
4569
4570        if (want != size) {
4571                drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4572                return -EIO;
4573        }
4574        if (want == 0)
4575                return 0;
4576        err = drbd_recv_all(peer_device->connection, p, want);
4577        if (err)
4578                return err;
4579
4580        drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4581
4582        c->word_offset += num_words;
4583        c->bit_offset = c->word_offset * BITS_PER_LONG;
4584        if (c->bit_offset > c->bm_bits)
4585                c->bit_offset = c->bm_bits;
4586
4587        return 1;
4588}
4589
4590static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4591{
4592        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4593}
4594
4595static int dcbp_get_start(struct p_compressed_bm *p)
4596{
4597        return (p->encoding & 0x80) != 0;
4598}
4599
4600static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4601{
4602        return (p->encoding >> 4) & 0x7;
4603}
4604
4605/**
4606 * recv_bm_rle_bits
4607 *
4608 * Return 0 when done, 1 when another iteration is needed, and a negative error
4609 * code upon failure.
4610 */
4611static int
4612recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4613                struct p_compressed_bm *p,
4614                 struct bm_xfer_ctx *c,
4615                 unsigned int len)
4616{
4617        struct bitstream bs;
4618        u64 look_ahead;
4619        u64 rl;
4620        u64 tmp;
4621        unsigned long s = c->bit_offset;
4622        unsigned long e;
4623        int toggle = dcbp_get_start(p);
4624        int have;
4625        int bits;
4626
4627        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4628
4629        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4630        if (bits < 0)
4631                return -EIO;
4632
4633        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4634                bits = vli_decode_bits(&rl, look_ahead);
4635                if (bits <= 0)
4636                        return -EIO;
4637
4638                if (toggle) {
4639                        e = s + rl -1;
4640                        if (e >= c->bm_bits) {
4641                                drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4642                                return -EIO;
4643                        }
4644                        _drbd_bm_set_bits(peer_device->device, s, e);
4645                }
4646
4647                if (have < bits) {
4648                        drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4649                                have, bits, look_ahead,
4650                                (unsigned int)(bs.cur.b - p->code),
4651                                (unsigned int)bs.buf_len);
4652                        return -EIO;
4653                }
4654                /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4655                if (likely(bits < 64))
4656                        look_ahead >>= bits;
4657                else
4658                        look_ahead = 0;
4659                have -= bits;
4660
4661                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4662                if (bits < 0)
4663                        return -EIO;
4664                look_ahead |= tmp << have;
4665                have += bits;
4666        }
4667
4668        c->bit_offset = s;
4669        bm_xfer_ctx_bit_to_word_offset(c);
4670
4671        return (s != c->bm_bits);
4672}
4673
4674/**
4675 * decode_bitmap_c
4676 *
4677 * Return 0 when done, 1 when another iteration is needed, and a negative error
4678 * code upon failure.
4679 */
4680static int
4681decode_bitmap_c(struct drbd_peer_device *peer_device,
4682                struct p_compressed_bm *p,
4683                struct bm_xfer_ctx *c,
4684                unsigned int len)
4685{
4686        if (dcbp_get_code(p) == RLE_VLI_Bits)
4687                return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4688
4689        /* other variants had been implemented for evaluation,
4690         * but have been dropped as this one turned out to be "best"
4691         * during all our tests. */
4692
4693        drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4694        conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4695        return -EIO;
4696}
4697
4698void INFO_bm_xfer_stats(struct drbd_device *device,
4699                const char *direction, struct bm_xfer_ctx *c)
4700{
4701        /* what would it take to transfer it "plaintext" */
4702        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4703        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4704        unsigned int plain =
4705                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4706                c->bm_words * sizeof(unsigned long);
4707        unsigned int total = c->bytes[0] + c->bytes[1];
4708        unsigned int r;
4709
4710        /* total can not be zero. but just in case: */
4711        if (total == 0)
4712                return;
4713
4714        /* don't report if not compressed */
4715        if (total >= plain)
4716                return;
4717
4718        /* total < plain. check for overflow, still */
4719        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4720                                    : (1000 * total / plain);
4721
4722        if (r > 1000)
4723                r = 1000;
4724
4725        r = 1000 - r;
4726        drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4727             "total %u; compression: %u.%u%%\n",
4728                        direction,
4729                        c->bytes[1], c->packets[1],
4730                        c->bytes[0], c->packets[0],
4731                        total, r/10, r % 10);
4732}
4733
4734/* Since we are processing the bitfield from lower addresses to higher,
4735   it does not matter if the process it in 32 bit chunks or 64 bit
4736   chunks as long as it is little endian. (Understand it as byte stream,
4737   beginning with the lowest byte...) If we would use big endian
4738   we would need to process it from the highest address to the lowest,
4739   in order to be agnostic to the 32 vs 64 bits issue.
4740
4741   returns 0 on failure, 1 if we successfully received it. */
4742static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4743{
4744        struct drbd_peer_device *peer_device;
4745        struct drbd_device *device;
4746        struct bm_xfer_ctx c;
4747        int err;
4748
4749        peer_device = conn_peer_device(connection, pi->vnr);
4750        if (!peer_device)
4751                return -EIO;
4752        device = peer_device->device;
4753
4754        drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4755        /* you are supposed to send additional out-of-sync information
4756         * if you actually set bits during this phase */
4757
4758        c = (struct bm_xfer_ctx) {
4759                .bm_bits = drbd_bm_bits(device),
4760                .bm_words = drbd_bm_words(device),
4761        };
4762
4763        for(;;) {
4764                if (pi->cmd == P_BITMAP)
4765                        err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4766                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4767                        /* MAYBE: sanity check that we speak proto >= 90,
4768                         * and the feature is enabled! */
4769                        struct p_compressed_bm *p = pi->data;
4770
4771                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4772                                drbd_err(device, "ReportCBitmap packet too large\n");
4773                                err = -EIO;
4774                                goto out;
4775                        }
4776                        if (pi->size <= sizeof(*p)) {
4777                                drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4778                                err = -EIO;
4779                                goto out;
4780                        }
4781                        err = drbd_recv_all(peer_device->connection, p, pi->size);
4782                        if (err)
4783                               goto out;
4784                        err = decode_bitmap_c(peer_device, p, &c, pi->size);
4785                } else {
4786                        drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4787                        err = -EIO;
4788                        goto out;
4789                }
4790
4791                c.packets[pi->cmd == P_BITMAP]++;
4792                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4793
4794                if (err <= 0) {
4795                        if (err < 0)
4796                                goto out;
4797                        break;
4798                }
4799                err = drbd_recv_header(peer_device->connection, pi);
4800                if (err)
4801                        goto out;
4802        }
4803
4804        INFO_bm_xfer_stats(device, "receive", &c);
4805
4806        if (device->state.conn == C_WF_BITMAP_T) {
4807                enum drbd_state_rv rv;
4808
4809                err = drbd_send_bitmap(device);
4810                if (err)
4811                        goto out;
4812                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4813                rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4814                D_ASSERT(device, rv == SS_SUCCESS);
4815        } else if (device->state.conn != C_WF_BITMAP_S) {
4816                /* admin may have requested C_DISCONNECTING,
4817                 * other threads may have noticed network errors */
4818                drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4819                    drbd_conn_str(device->state.conn));
4820        }
4821        err = 0;
4822
4823 out:
4824        drbd_bm_unlock(device);
4825        if (!err && device->state.conn == C_WF_BITMAP_S)
4826                drbd_start_resync(device, C_SYNC_SOURCE);
4827        return err;
4828}
4829
4830static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4831{
4832        drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4833                 pi->cmd, pi->size);
4834
4835        return ignore_remaining_packet(connection, pi);
4836}
4837
4838static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4839{
4840        /* Make sure we've acked all the TCP data associated
4841         * with the data requests being unplugged */
4842        drbd_tcp_quickack(connection->data.socket);
4843
4844        return 0;
4845}
4846
4847static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4848{
4849        struct drbd_peer_device *peer_device;
4850        struct drbd_device *device;
4851        struct p_block_desc *p = pi->data;
4852
4853        peer_device = conn_peer_device(connection, pi->vnr);
4854        if (!peer_device)
4855                return -EIO;
4856        device = peer_device->device;
4857
4858        switch (device->state.conn) {
4859        case C_WF_SYNC_UUID:
4860        case C_WF_BITMAP_T:
4861        case C_BEHIND:
4862                        break;
4863        default:
4864                drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4865                                drbd_conn_str(device->state.conn));
4866        }
4867
4868        drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4869
4870        return 0;
4871}
4872
4873static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4874{
4875        struct drbd_peer_device *peer_device;
4876        struct p_block_desc *p = pi->data;
4877        struct drbd_device *device;
4878        sector_t sector;
4879        int size, err = 0;
4880
4881        peer_device = conn_peer_device(connection, pi->vnr);
4882        if (!peer_device)
4883                return -EIO;
4884        device = peer_device->device;
4885
4886        sector = be64_to_cpu(p->sector);
4887        size = be32_to_cpu(p->blksize);
4888
4889        dec_rs_pending(device);
4890
4891        if (get_ldev(device)) {
4892                struct drbd_peer_request *peer_req;
4893                const int op = REQ_OP_DISCARD;
4894
4895                peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4896                                               size, 0, GFP_NOIO);
4897                if (!peer_req) {
4898                        put_ldev(device);
4899                        return -ENOMEM;
4900                }
4901
4902                peer_req->w.cb = e_end_resync_block;
4903                peer_req->submit_jif = jiffies;
4904                peer_req->flags |= EE_IS_TRIM;
4905
4906                spin_lock_irq(&device->resource->req_lock);
4907                list_add_tail(&peer_req->w.list, &device->sync_ee);
4908                spin_unlock_irq(&device->resource->req_lock);
4909
4910                atomic_add(pi->size >> 9, &device->rs_sect_ev);
4911                err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4912
4913                if (err) {
4914                        spin_lock_irq(&device->resource->req_lock);
4915                        list_del(&peer_req->w.list);
4916                        spin_unlock_irq(&device->resource->req_lock);
4917
4918                        drbd_free_peer_req(device, peer_req);
4919                        put_ldev(device);
4920                        err = 0;
4921                        goto fail;
4922                }
4923
4924                inc_unacked(device);
4925
4926                /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4927                   as well as drbd_rs_complete_io() */
4928        } else {
4929        fail:
4930                drbd_rs_complete_io(device, sector);
4931                drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4932        }
4933
4934        atomic_add(size >> 9, &device->rs_sect_in);
4935
4936        return err;
4937}
4938
4939struct data_cmd {
4940        int expect_payload;
4941        unsigned int pkt_size;
4942        int (*fn)(struct drbd_connection *, struct packet_info *);
4943};
4944
4945static struct data_cmd drbd_cmd_handler[] = {
4946        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4947        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4948        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4949        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4950        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4951        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4952        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4953        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4955        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4956        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4957        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4958        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4959        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4960        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4961        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4962        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4963        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4964        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4965        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4966        [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4967        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4968        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4969        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4970        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4971        [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4972        [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4973        [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4974};
4975
4976static void drbdd(struct drbd_connection *connection)
4977{
4978        struct packet_info pi;
4979        size_t shs; /* sub header size */
4980        int err;
4981
4982        while (get_t_state(&connection->receiver) == RUNNING) {
4983                struct data_cmd const *cmd;
4984
4985                drbd_thread_current_set_cpu(&connection->receiver);
4986                update_receiver_timing_details(connection, drbd_recv_header);
4987                if (drbd_recv_header(connection, &pi))
4988                        goto err_out;
4989
4990                cmd = &drbd_cmd_handler[pi.cmd];
4991                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4992                        drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4993                                 cmdname(pi.cmd), pi.cmd);
4994                        goto err_out;
4995                }
4996
4997                shs = cmd->pkt_size;
4998                if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4999                        shs += sizeof(struct o_qlim);
5000                if (pi.size > shs && !cmd->expect_payload) {
5001                        drbd_err(connection, "No payload expected %s l:%d\n",
5002                                 cmdname(pi.cmd), pi.size);
5003                        goto err_out;
5004                }
5005                if (pi.size < shs) {
5006                        drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5007                                 cmdname(pi.cmd), (int)shs, pi.size);
5008                        goto err_out;
5009                }
5010
5011                if (shs) {
5012                        update_receiver_timing_details(connection, drbd_recv_all_warn);
5013                        err = drbd_recv_all_warn(connection, pi.data, shs);
5014                        if (err)
5015                                goto err_out;
5016                        pi.size -= shs;
5017                }
5018
5019                update_receiver_timing_details(connection, cmd->fn);
5020                err = cmd->fn(connection, &pi);
5021                if (err) {
5022                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5023                                 cmdname(pi.cmd), err, pi.size);
5024                        goto err_out;
5025                }
5026        }
5027        return;
5028
5029    err_out:
5030        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5031}
5032
5033static void conn_disconnect(struct drbd_connection *connection)
5034{
5035        struct drbd_peer_device *peer_device;
5036        enum drbd_conns oc;
5037        int vnr;
5038
5039        if (connection->cstate == C_STANDALONE)
5040                return;
5041
5042        /* We are about to start the cleanup after connection loss.
5043         * Make sure drbd_make_request knows about that.
5044         * Usually we should be in some network failure state already,
5045         * but just in case we are not, we fix it up here.
5046         */
5047        conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5048
5049        /* ack_receiver does not clean up anything. it must not interfere, either */
5050        drbd_thread_stop(&connection->ack_receiver);
5051        if (connection->ack_sender) {
5052                destroy_workqueue(connection->ack_sender);
5053                connection->ack_sender = NULL;
5054        }
5055        drbd_free_sock(connection);
5056
5057        rcu_read_lock();
5058        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5059                struct drbd_device *device = peer_device->device;
5060                kref_get(&device->kref);
5061                rcu_read_unlock();
5062                drbd_disconnected(peer_device);
5063                kref_put(&device->kref, drbd_destroy_device);
5064                rcu_read_lock();
5065        }
5066        rcu_read_unlock();
5067
5068        if (!list_empty(&connection->current_epoch->list))
5069                drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5070        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5071        atomic_set(&connection->current_epoch->epoch_size, 0);
5072        connection->send.seen_any_write_yet = false;
5073
5074        drbd_info(connection, "Connection closed\n");
5075
5076        if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5077                conn_try_outdate_peer_async(connection);
5078
5079        spin_lock_irq(&connection->resource->req_lock);
5080        oc = connection->cstate;
5081        if (oc >= C_UNCONNECTED)
5082                _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5083
5084        spin_unlock_irq(&connection->resource->req_lock);
5085
5086        if (oc == C_DISCONNECTING)
5087                conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5088}
5089
5090static int drbd_disconnected(struct drbd_peer_device *peer_device)
5091{
5092        struct drbd_device *device = peer_device->device;
5093        unsigned int i;
5094
5095        /* wait for current activity to cease. */
5096        spin_lock_irq(&device->resource->req_lock);
5097        _drbd_wait_ee_list_empty(device, &device->active_ee);
5098        _drbd_wait_ee_list_empty(device, &device->sync_ee);
5099        _drbd_wait_ee_list_empty(device, &device->read_ee);
5100        spin_unlock_irq(&device->resource->req_lock);
5101
5102        /* We do not have data structures that would allow us to
5103         * get the rs_pending_cnt down to 0 again.
5104         *  * On C_SYNC_TARGET we do not have any data structures describing
5105         *    the pending RSDataRequest's we have sent.
5106         *  * On C_SYNC_SOURCE there is no data structure that tracks
5107         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5108         *  And no, it is not the sum of the reference counts in the
5109         *  resync_LRU. The resync_LRU tracks the whole operation including
5110         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5111         *  on the fly. */
5112        drbd_rs_cancel_all(device);
5113        device->rs_total = 0;
5114        device->rs_failed = 0;
5115        atomic_set(&device->rs_pending_cnt, 0);
5116        wake_up(&device->misc_wait);
5117
5118        del_timer_sync(&device->resync_timer);
5119        resync_timer_fn((unsigned long)device);
5120
5121        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5122         * w_make_resync_request etc. which may still be on the worker queue
5123         * to be "canceled" */
5124        drbd_flush_workqueue(&peer_device->connection->sender_work);
5125
5126        drbd_finish_peer_reqs(device);
5127
5128        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5129           might have issued a work again. The one before drbd_finish_peer_reqs() is
5130           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5131        drbd_flush_workqueue(&peer_device->connection->sender_work);
5132
5133        /* need to do it again, drbd_finish_peer_reqs() may have populated it
5134         * again via drbd_try_clear_on_disk_bm(). */
5135        drbd_rs_cancel_all(device);
5136
5137        kfree(device->p_uuid);
5138        device->p_uuid = NULL;
5139
5140        if (!drbd_suspended(device))
5141                tl_clear(peer_device->connection);
5142
5143        drbd_md_sync(device);
5144
5145        if (get_ldev(device)) {
5146                drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5147                                "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5148                put_ldev(device);
5149        }
5150
5151        /* tcp_close and release of sendpage pages can be deferred.  I don't
5152         * want to use SO_LINGER, because apparently it can be deferred for
5153         * more than 20 seconds (longest time I checked).
5154         *
5155         * Actually we don't care for exactly when the network stack does its
5156         * put_page(), but release our reference on these pages right here.
5157         */
5158        i = drbd_free_peer_reqs(device, &device->net_ee);
5159        if (i)
5160                drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5161        i = atomic_read(&device->pp_in_use_by_net);
5162        if (i)
5163                drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5164        i = atomic_read(&device->pp_in_use);
5165        if (i)
5166                drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5167
5168        D_ASSERT(device, list_empty(&device->read_ee));
5169        D_ASSERT(device, list_empty(&device->active_ee));
5170        D_ASSERT(device, list_empty(&device->sync_ee));
5171        D_ASSERT(device, list_empty(&device->done_ee));
5172
5173        return 0;
5174}
5175
5176/*
5177 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5178 * we can agree on is stored in agreed_pro_version.
5179 *
5180 * feature flags and the reserved array should be enough room for future
5181 * enhancements of the handshake protocol, and possible plugins...
5182 *
5183 * for now, they are expected to be zero, but ignored.
5184 */
5185static int drbd_send_features(struct drbd_connection *connection)
5186{
5187        struct drbd_socket *sock;
5188        struct p_connection_features *p;
5189
5190        sock = &connection->data;
5191        p = conn_prepare_command(connection, sock);
5192        if (!p)
5193                return -EIO;
5194        memset(p, 0, sizeof(*p));
5195        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5196        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5197        p->feature_flags = cpu_to_be32(PRO_FEATURES);
5198        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5199}
5200
5201/*
5202 * return values:
5203 *   1 yes, we have a valid connection
5204 *   0 oops, did not work out, please try again
5205 *  -1 peer talks different language,
5206 *     no point in trying again, please go standalone.
5207 */
5208static int drbd_do_features(struct drbd_connection *connection)
5209{
5210        /* ASSERT current == connection->receiver ... */
5211        struct p_connection_features *p;
5212        const int expect = sizeof(struct p_connection_features);
5213        struct packet_info pi;
5214        int err;
5215
5216        err = drbd_send_features(connection);
5217        if (err)
5218                return 0;
5219
5220        err = drbd_recv_header(connection, &pi);
5221        if (err)
5222                return 0;
5223
5224        if (pi.cmd != P_CONNECTION_FEATURES) {
5225                drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5226                         cmdname(pi.cmd), pi.cmd);
5227                return -1;
5228        }
5229
5230        if (pi.size != expect) {
5231                drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5232                     expect, pi.size);
5233                return -1;
5234        }
5235
5236        p = pi.data;
5237        err = drbd_recv_all_warn(connection, p, expect);
5238        if (err)
5239                return 0;
5240
5241        p->protocol_min = be32_to_cpu(p->protocol_min);
5242        p->protocol_max = be32_to_cpu(p->protocol_max);
5243        if (p->protocol_max == 0)
5244                p->protocol_max = p->protocol_min;
5245
5246        if (PRO_VERSION_MAX < p->protocol_min ||
5247            PRO_VERSION_MIN > p->protocol_max)
5248                goto incompat;
5249
5250        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5251        connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5252
5253        drbd_info(connection, "Handshake successful: "
5254             "Agreed network protocol version %d\n", connection->agreed_pro_version);
5255
5256        drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5257                  connection->agreed_features,
5258                  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5259                  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5260                  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5261                  connection->agreed_features ? "" : " none");
5262
5263        return 1;
5264
5265 incompat:
5266        drbd_err(connection, "incompatible DRBD dialects: "
5267            "I support %d-%d, peer supports %d-%d\n",
5268            PRO_VERSION_MIN, PRO_VERSION_MAX,
5269            p->protocol_min, p->protocol_max);
5270        return -1;
5271}
5272
5273#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5274static int drbd_do_auth(struct drbd_connection *connection)
5275{
5276        drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5277        drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5278        return -1;
5279}
5280#else
5281#define CHALLENGE_LEN 64
5282
5283/* Return value:
5284        1 - auth succeeded,
5285        0 - failed, try again (network error),
5286        -1 - auth failed, don't try again.
5287*/
5288
5289static int drbd_do_auth(struct drbd_connection *connection)
5290{
5291        struct drbd_socket *sock;
5292        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5293        char *response = NULL;
5294        char *right_response = NULL;
5295        char *peers_ch = NULL;
5296        unsigned int key_len;
5297        char secret[SHARED_SECRET_MAX]; /* 64 byte */
5298        unsigned int resp_size;
5299        SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5300        struct packet_info pi;
5301        struct net_conf *nc;
5302        int err, rv;
5303
5304        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5305
5306        rcu_read_lock();
5307        nc = rcu_dereference(connection->net_conf);
5308        key_len = strlen(nc->shared_secret);
5309        memcpy(secret, nc->shared_secret, key_len);
5310        rcu_read_unlock();
5311
5312        desc->tfm = connection->cram_hmac_tfm;
5313        desc->flags = 0;
5314
5315        rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5316        if (rv) {
5317                drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5318                rv = -1;
5319                goto fail;
5320        }
5321
5322        get_random_bytes(my_challenge, CHALLENGE_LEN);
5323
5324        sock = &connection->data;
5325        if (!conn_prepare_command(connection, sock)) {
5326                rv = 0;
5327                goto fail;
5328        }
5329        rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5330                                my_challenge, CHALLENGE_LEN);
5331        if (!rv)
5332                goto fail;
5333
5334        err = drbd_recv_header(connection, &pi);
5335        if (err) {
5336                rv = 0;
5337                goto fail;
5338        }
5339
5340        if (pi.cmd != P_AUTH_CHALLENGE) {
5341                drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5342                         cmdname(pi.cmd), pi.cmd);
5343                rv = 0;
5344                goto fail;
5345        }
5346
5347        if (pi.size > CHALLENGE_LEN * 2) {
5348                drbd_err(connection, "expected AuthChallenge payload too big.\n");
5349                rv = -1;
5350                goto fail;
5351        }
5352
5353        if (pi.size < CHALLENGE_LEN) {
5354                drbd_err(connection, "AuthChallenge payload too small.\n");
5355                rv = -1;
5356                goto fail;
5357        }
5358
5359        peers_ch = kmalloc(pi.size, GFP_NOIO);
5360        if (peers_ch == NULL) {
5361                drbd_err(connection, "kmalloc of peers_ch failed\n");
5362                rv = -1;
5363                goto fail;
5364        }
5365
5366        err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5367        if (err) {
5368                rv = 0;
5369                goto fail;
5370        }
5371
5372        if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5373                drbd_err(connection, "Peer presented the same challenge!\n");
5374                rv = -1;
5375                goto fail;
5376        }
5377
5378        resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5379        response = kmalloc(resp_size, GFP_NOIO);
5380        if (response == NULL) {
5381                drbd_err(connection, "kmalloc of response failed\n");
5382                rv = -1;
5383                goto fail;
5384        }
5385
5386        rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5387        if (rv) {
5388                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5389                rv = -1;
5390                goto fail;
5391        }
5392
5393        if (!conn_prepare_command(connection, sock)) {
5394                rv = 0;
5395                goto fail;
5396        }
5397        rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5398                                response, resp_size);
5399        if (!rv)
5400                goto fail;
5401
5402        err = drbd_recv_header(connection, &pi);
5403        if (err) {
5404                rv = 0;
5405                goto fail;
5406        }
5407
5408        if (pi.cmd != P_AUTH_RESPONSE) {
5409                drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5410                         cmdname(pi.cmd), pi.cmd);
5411                rv = 0;
5412                goto fail;
5413        }
5414
5415        if (pi.size != resp_size) {
5416                drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5417                rv = 0;
5418                goto fail;
5419        }
5420
5421        err = drbd_recv_all_warn(connection, response , resp_size);
5422        if (err) {
5423                rv = 0;
5424                goto fail;
5425        }
5426
5427        right_response = kmalloc(resp_size, GFP_NOIO);
5428        if (right_response == NULL) {
5429                drbd_err(connection, "kmalloc of right_response failed\n");
5430                rv = -1;
5431                goto fail;
5432        }
5433
5434        rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5435                                 right_response);
5436        if (rv) {
5437                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5438                rv = -1;
5439                goto fail;
5440        }
5441
5442        rv = !memcmp(response, right_response, resp_size);
5443
5444        if (rv)
5445                drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5446                     resp_size);
5447        else
5448                rv = -1;
5449
5450 fail:
5451        kfree(peers_ch);
5452        kfree(response);
5453        kfree(right_response);
5454        shash_desc_zero(desc);
5455
5456        return rv;
5457}
5458#endif
5459
5460int drbd_receiver(struct drbd_thread *thi)
5461{
5462        struct drbd_connection *connection = thi->connection;
5463        int h;
5464
5465        drbd_info(connection, "receiver (re)started\n");
5466
5467        do {
5468                h = conn_connect(connection);
5469                if (h == 0) {
5470                        conn_disconnect(connection);
5471                        schedule_timeout_interruptible(HZ);
5472                }
5473                if (h == -1) {
5474                        drbd_warn(connection, "Discarding network configuration.\n");
5475                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5476                }
5477        } while (h == 0);
5478
5479        if (h > 0)
5480                drbdd(connection);
5481
5482        conn_disconnect(connection);
5483
5484        drbd_info(connection, "receiver terminated\n");
5485        return 0;
5486}
5487
5488/* ********* acknowledge sender ******** */
5489
5490static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5491{
5492        struct p_req_state_reply *p = pi->data;
5493        int retcode = be32_to_cpu(p->retcode);
5494
5495        if (retcode >= SS_SUCCESS) {
5496                set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5497        } else {
5498                set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5499                drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5500                         drbd_set_st_err_str(retcode), retcode);
5501        }
5502        wake_up(&connection->ping_wait);
5503
5504        return 0;
5505}
5506
5507static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5508{
5509        struct drbd_peer_device *peer_device;
5510        struct drbd_device *device;
5511        struct p_req_state_reply *p = pi->data;
5512        int retcode = be32_to_cpu(p->retcode);
5513
5514        peer_device = conn_peer_device(connection, pi->vnr);
5515        if (!peer_device)
5516                return -EIO;
5517        device = peer_device->device;
5518
5519        if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5520                D_ASSERT(device, connection->agreed_pro_version < 100);
5521                return got_conn_RqSReply(connection, pi);
5522        }
5523
5524        if (retcode >= SS_SUCCESS) {
5525                set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5526        } else {
5527                set_bit(CL_ST_CHG_FAIL, &device->flags);
5528                drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5529                        drbd_set_st_err_str(retcode), retcode);
5530        }
5531        wake_up(&device->state_wait);
5532
5533        return 0;
5534}
5535
5536static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5537{
5538        return drbd_send_ping_ack(connection);
5539
5540}
5541
5542static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5543{
5544        /* restore idle timeout */
5545        connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5546        if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5547                wake_up(&connection->ping_wait);
5548
5549        return 0;
5550}
5551
5552static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5553{
5554        struct drbd_peer_device *peer_device;
5555        struct drbd_device *device;
5556        struct p_block_ack *p = pi->data;
5557        sector_t sector = be64_to_cpu(p->sector);
5558        int blksize = be32_to_cpu(p->blksize);
5559
5560        peer_device = conn_peer_device(connection, pi->vnr);
5561        if (!peer_device)
5562                return -EIO;
5563        device = peer_device->device;
5564
5565        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5566
5567        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5568
5569        if (get_ldev(device)) {
5570                drbd_rs_complete_io(device, sector);
5571                drbd_set_in_sync(device, sector, blksize);
5572                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5573                device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5574                put_ldev(device);
5575        }
5576        dec_rs_pending(device);
5577        atomic_add(blksize >> 9, &device->rs_sect_in);
5578
5579        return 0;
5580}
5581
5582static int
5583validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5584                              struct rb_root *root, const char *func,
5585                              enum drbd_req_event what, bool missing_ok)
5586{
5587        struct drbd_request *req;
5588        struct bio_and_error m;
5589
5590        spin_lock_irq(&device->resource->req_lock);
5591        req = find_request(device, root, id, sector, missing_ok, func);
5592        if (unlikely(!req)) {
5593                spin_unlock_irq(&device->resource->req_lock);
5594                return -EIO;
5595        }
5596        __req_mod(req, what, &m);
5597        spin_unlock_irq(&device->resource->req_lock);
5598
5599        if (m.bio)
5600                complete_master_bio(device, &m);
5601        return 0;
5602}
5603
5604static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5605{
5606        struct drbd_peer_device *peer_device;
5607        struct drbd_device *device;
5608        struct p_block_ack *p = pi->data;
5609        sector_t sector = be64_to_cpu(p->sector);
5610        int blksize = be32_to_cpu(p->blksize);
5611        enum drbd_req_event what;
5612
5613        peer_device = conn_peer_device(connection, pi->vnr);
5614        if (!peer_device)
5615                return -EIO;
5616        device = peer_device->device;
5617
5618        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5619
5620        if (p->block_id == ID_SYNCER) {
5621                drbd_set_in_sync(device, sector, blksize);
5622                dec_rs_pending(device);
5623                return 0;
5624        }
5625        switch (pi->cmd) {
5626        case P_RS_WRITE_ACK:
5627                what = WRITE_ACKED_BY_PEER_AND_SIS;
5628                break;
5629        case P_WRITE_ACK:
5630                what = WRITE_ACKED_BY_PEER;
5631                break;
5632        case P_RECV_ACK:
5633                what = RECV_ACKED_BY_PEER;
5634                break;
5635        case P_SUPERSEDED:
5636                what = CONFLICT_RESOLVED;
5637                break;
5638        case P_RETRY_WRITE:
5639                what = POSTPONE_WRITE;
5640                break;
5641        default:
5642                BUG();
5643        }
5644
5645        return validate_req_change_req_state(device, p->block_id, sector,
5646                                             &device->write_requests, __func__,
5647                                             what, false);
5648}
5649
5650static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5651{
5652        struct drbd_peer_device *peer_device;
5653        struct drbd_device *device;
5654        struct p_block_ack *p = pi->data;
5655        sector_t sector = be64_to_cpu(p->sector);
5656        int size = be32_to_cpu(p->blksize);
5657        int err;
5658
5659        peer_device = conn_peer_device(connection, pi->vnr);
5660        if (!peer_device)
5661                return -EIO;
5662        device = peer_device->device;
5663
5664        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5665
5666        if (p->block_id == ID_SYNCER) {
5667                dec_rs_pending(device);
5668                drbd_rs_failed_io(device, sector, size);
5669                return 0;
5670        }
5671
5672        err = validate_req_change_req_state(device, p->block_id, sector,
5673                                            &device->write_requests, __func__,
5674                                            NEG_ACKED, true);
5675        if (err) {
5676                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5677                   The master bio might already be completed, therefore the
5678                   request is no longer in the collision hash. */
5679                /* In Protocol B we might already have got a P_RECV_ACK
5680                   but then get a P_NEG_ACK afterwards. */
5681                drbd_set_out_of_sync(device, sector, size);
5682        }
5683        return 0;
5684}
5685
5686static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5687{
5688        struct drbd_peer_device *peer_device;
5689        struct drbd_device *device;
5690        struct p_block_ack *p = pi->data;
5691        sector_t sector = be64_to_cpu(p->sector);
5692
5693        peer_device = conn_peer_device(connection, pi->vnr);
5694        if (!peer_device)
5695                return -EIO;
5696        device = peer_device->device;
5697
5698        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5699
5700        drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5701            (unsigned long long)sector, be32_to_cpu(p->blksize));
5702
5703        return validate_req_change_req_state(device, p->block_id, sector,
5704                                             &device->read_requests, __func__,
5705                                             NEG_ACKED, false);
5706}
5707
5708static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5709{
5710        struct drbd_peer_device *peer_device;
5711        struct drbd_device *device;
5712        sector_t sector;
5713        int size;
5714        struct p_block_ack *p = pi->data;
5715
5716        peer_device = conn_peer_device(connection, pi->vnr);
5717        if (!peer_device)
5718                return -EIO;
5719        device = peer_device->device;
5720
5721        sector = be64_to_cpu(p->sector);
5722        size = be32_to_cpu(p->blksize);
5723
5724        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5725
5726        dec_rs_pending(device);
5727
5728        if (get_ldev_if_state(device, D_FAILED)) {
5729                drbd_rs_complete_io(device, sector);
5730                switch (pi->cmd) {
5731                case P_NEG_RS_DREPLY:
5732                        drbd_rs_failed_io(device, sector, size);
5733                case P_RS_CANCEL:
5734                        break;
5735                default:
5736                        BUG();
5737                }
5738                put_ldev(device);
5739        }
5740
5741        return 0;
5742}
5743
5744static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5745{
5746        struct p_barrier_ack *p = pi->data;
5747        struct drbd_peer_device *peer_device;
5748        int vnr;
5749
5750        tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5751
5752        rcu_read_lock();
5753        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5754                struct drbd_device *device = peer_device->device;
5755
5756                if (device->state.conn == C_AHEAD &&
5757                    atomic_read(&device->ap_in_flight) == 0 &&
5758                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5759                        device->start_resync_timer.expires = jiffies + HZ;
5760                        add_timer(&device->start_resync_timer);
5761                }
5762        }
5763        rcu_read_unlock();
5764
5765        return 0;
5766}
5767
5768static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5769{
5770        struct drbd_peer_device *peer_device;
5771        struct drbd_device *device;
5772        struct p_block_ack *p = pi->data;
5773        struct drbd_device_work *dw;
5774        sector_t sector;
5775        int size;
5776
5777        peer_device = conn_peer_device(connection, pi->vnr);
5778        if (!peer_device)
5779                return -EIO;
5780        device = peer_device->device;
5781
5782        sector = be64_to_cpu(p->sector);
5783        size = be32_to_cpu(p->blksize);
5784
5785        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5786
5787        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5788                drbd_ov_out_of_sync_found(device, sector, size);
5789        else
5790                ov_out_of_sync_print(device);
5791
5792        if (!get_ldev(device))
5793                return 0;
5794
5795        drbd_rs_complete_io(device, sector);
5796        dec_rs_pending(device);
5797
5798        --device->ov_left;
5799
5800        /* let's advance progress step marks only for every other megabyte */
5801        if ((device->ov_left & 0x200) == 0x200)
5802                drbd_advance_rs_marks(device, device->ov_left);
5803
5804        if (device->ov_left == 0) {
5805                dw = kmalloc(sizeof(*dw), GFP_NOIO);
5806                if (dw) {
5807                        dw->w.cb = w_ov_finished;
5808                        dw->device = device;
5809                        drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5810                } else {
5811                        drbd_err(device, "kmalloc(dw) failed.");
5812                        ov_out_of_sync_print(device);
5813                        drbd_resync_finished(device);
5814                }
5815        }
5816        put_ldev(device);
5817        return 0;
5818}
5819
5820static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5821{
5822        return 0;
5823}
5824
5825struct meta_sock_cmd {
5826        size_t pkt_size;
5827        int (*fn)(struct drbd_connection *connection, struct packet_info *);
5828};
5829
5830static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5831{
5832        long t;
5833        struct net_conf *nc;
5834
5835        rcu_read_lock();
5836        nc = rcu_dereference(connection->net_conf);
5837        t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5838        rcu_read_unlock();
5839
5840        t *= HZ;
5841        if (ping_timeout)
5842                t /= 10;
5843
5844        connection->meta.socket->sk->sk_rcvtimeo = t;
5845}
5846
5847static void set_ping_timeout(struct drbd_connection *connection)
5848{
5849        set_rcvtimeo(connection, 1);
5850}
5851
5852static void set_idle_timeout(struct drbd_connection *connection)
5853{
5854        set_rcvtimeo(connection, 0);
5855}
5856
5857static struct meta_sock_cmd ack_receiver_tbl[] = {
5858        [P_PING]            = { 0, got_Ping },
5859        [P_PING_ACK]        = { 0, got_PingAck },
5860        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5861        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5862        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5863        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5864        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5865        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5866        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5867        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5868        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5869        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5870        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5871        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5872        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5873        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5874        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5875};
5876
5877int drbd_ack_receiver(struct drbd_thread *thi)
5878{
5879        struct drbd_connection *connection = thi->connection;
5880        struct meta_sock_cmd *cmd = NULL;
5881        struct packet_info pi;
5882        unsigned long pre_recv_jif;
5883        int rv;
5884        void *buf    = connection->meta.rbuf;
5885        int received = 0;
5886        unsigned int header_size = drbd_header_size(connection);
5887        int expect   = header_size;
5888        bool ping_timeout_active = false;
5889        struct sched_param param = { .sched_priority = 2 };
5890
5891        rv = sched_setscheduler(current, SCHED_RR, &param);
5892        if (rv < 0)
5893                drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5894
5895        while (get_t_state(thi) == RUNNING) {
5896                drbd_thread_current_set_cpu(thi);
5897
5898                conn_reclaim_net_peer_reqs(connection);
5899
5900                if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5901                        if (drbd_send_ping(connection)) {
5902                                drbd_err(connection, "drbd_send_ping has failed\n");
5903                                goto reconnect;
5904                        }
5905                        set_ping_timeout(connection);
5906                        ping_timeout_active = true;
5907                }
5908
5909                pre_recv_jif = jiffies;
5910                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5911
5912                /* Note:
5913                 * -EINTR        (on meta) we got a signal
5914                 * -EAGAIN       (on meta) rcvtimeo expired
5915                 * -ECONNRESET   other side closed the connection
5916                 * -ERESTARTSYS  (on data) we got a signal
5917                 * rv <  0       other than above: unexpected error!
5918                 * rv == expected: full header or command
5919                 * rv <  expected: "woken" by signal during receive
5920                 * rv == 0       : "connection shut down by peer"
5921                 */
5922                if (likely(rv > 0)) {
5923                        received += rv;
5924                        buf      += rv;
5925                } else if (rv == 0) {
5926                        if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5927                                long t;
5928                                rcu_read_lock();
5929                                t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5930                                rcu_read_unlock();
5931
5932                                t = wait_event_timeout(connection->ping_wait,
5933                                                       connection->cstate < C_WF_REPORT_PARAMS,
5934                                                       t);
5935                                if (t)
5936                                        break;
5937                        }
5938                        drbd_err(connection, "meta connection shut down by peer.\n");
5939                        goto reconnect;
5940                } else if (rv == -EAGAIN) {
5941                        /* If the data socket received something meanwhile,
5942                         * that is good enough: peer is still alive. */
5943                        if (time_after(connection->last_received, pre_recv_jif))
5944                                continue;
5945                        if (ping_timeout_active) {
5946                                drbd_err(connection, "PingAck did not arrive in time.\n");
5947                                goto reconnect;
5948                        }
5949                        set_bit(SEND_PING, &connection->flags);
5950                        continue;
5951                } else if (rv == -EINTR) {
5952                        /* maybe drbd_thread_stop(): the while condition will notice.
5953                         * maybe woken for send_ping: we'll send a ping above,
5954                         * and change the rcvtimeo */
5955                        flush_signals(current);
5956                        continue;
5957                } else {
5958                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5959                        goto reconnect;
5960                }
5961
5962                if (received == expect && cmd == NULL) {
5963                        if (decode_header(connection, connection->meta.rbuf, &pi))
5964                                goto reconnect;
5965                        cmd = &ack_receiver_tbl[pi.cmd];
5966                        if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5967                                drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5968                                         cmdname(pi.cmd), pi.cmd);
5969                                goto disconnect;
5970                        }
5971                        expect = header_size + cmd->pkt_size;
5972                        if (pi.size != expect - header_size) {
5973                                drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5974                                        pi.cmd, pi.size);
5975                                goto reconnect;
5976                        }
5977                }
5978                if (received == expect) {
5979                        bool err;
5980
5981                        err = cmd->fn(connection, &pi);
5982                        if (err) {
5983                                drbd_err(connection, "%pf failed\n", cmd->fn);
5984                                goto reconnect;
5985                        }
5986
5987                        connection->last_received = jiffies;
5988
5989                        if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5990                                set_idle_timeout(connection);
5991                                ping_timeout_active = false;
5992                        }
5993
5994                        buf      = connection->meta.rbuf;
5995                        received = 0;
5996                        expect   = header_size;
5997                        cmd      = NULL;
5998                }
5999        }
6000
6001        if (0) {
6002reconnect:
6003                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6004                conn_md_sync(connection);
6005        }
6006        if (0) {
6007disconnect:
6008                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6009        }
6010
6011        drbd_info(connection, "ack_receiver terminated\n");
6012
6013        return 0;
6014}
6015
6016void drbd_send_acks_wf(struct work_struct *ws)
6017{
6018        struct drbd_peer_device *peer_device =
6019                container_of(ws, struct drbd_peer_device, send_acks_work);
6020        struct drbd_connection *connection = peer_device->connection;
6021        struct drbd_device *device = peer_device->device;
6022        struct net_conf *nc;
6023        int tcp_cork, err;
6024
6025        rcu_read_lock();
6026        nc = rcu_dereference(connection->net_conf);
6027        tcp_cork = nc->tcp_cork;
6028        rcu_read_unlock();
6029
6030        if (tcp_cork)
6031                drbd_tcp_cork(connection->meta.socket);
6032
6033        err = drbd_finish_peer_reqs(device);
6034        kref_put(&device->kref, drbd_destroy_device);
6035        /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6036           struct work_struct send_acks_work alive, which is in the peer_device object */
6037
6038        if (err) {
6039                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6040                return;
6041        }
6042
6043        if (tcp_cork)
6044                drbd_tcp_uncork(connection->meta.socket);
6045
6046        return;
6047}
6048