linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <asm/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_protocol.h"
  48#include "drbd_req.h"
  49#include "drbd_vli.h"
  50
  51#define PRO_FEATURES (FF_TRIM)
  52
  53struct packet_info {
  54        enum drbd_packet cmd;
  55        unsigned int size;
  56        unsigned int vnr;
  57        void *data;
  58};
  59
  60enum finish_epoch {
  61        FE_STILL_LIVE,
  62        FE_DESTROYED,
  63        FE_RECYCLED,
  64};
  65
  66static int drbd_do_features(struct drbd_connection *connection);
  67static int drbd_do_auth(struct drbd_connection *connection);
  68static int drbd_disconnected(struct drbd_peer_device *);
  69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
  70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  71static int e_end_block(struct drbd_work *, int);
  72
  73
  74#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  75
  76/*
  77 * some helper functions to deal with single linked page lists,
  78 * page->private being our "next" pointer.
  79 */
  80
  81/* If at least n pages are linked at head, get n pages off.
  82 * Otherwise, don't modify head, and return NULL.
  83 * Locking is the responsibility of the caller.
  84 */
  85static struct page *page_chain_del(struct page **head, int n)
  86{
  87        struct page *page;
  88        struct page *tmp;
  89
  90        BUG_ON(!n);
  91        BUG_ON(!head);
  92
  93        page = *head;
  94
  95        if (!page)
  96                return NULL;
  97
  98        while (page) {
  99                tmp = page_chain_next(page);
 100                if (--n == 0)
 101                        break; /* found sufficient pages */
 102                if (tmp == NULL)
 103                        /* insufficient pages, don't use any of them. */
 104                        return NULL;
 105                page = tmp;
 106        }
 107
 108        /* add end of list marker for the returned list */
 109        set_page_private(page, 0);
 110        /* actual return value, and adjustment of head */
 111        page = *head;
 112        *head = tmp;
 113        return page;
 114}
 115
 116/* may be used outside of locks to find the tail of a (usually short)
 117 * "private" page chain, before adding it back to a global chain head
 118 * with page_chain_add() under a spinlock. */
 119static struct page *page_chain_tail(struct page *page, int *len)
 120{
 121        struct page *tmp;
 122        int i = 1;
 123        while ((tmp = page_chain_next(page)))
 124                ++i, page = tmp;
 125        if (len)
 126                *len = i;
 127        return page;
 128}
 129
 130static int page_chain_free(struct page *page)
 131{
 132        struct page *tmp;
 133        int i = 0;
 134        page_chain_for_each_safe(page, tmp) {
 135                put_page(page);
 136                ++i;
 137        }
 138        return i;
 139}
 140
 141static void page_chain_add(struct page **head,
 142                struct page *chain_first, struct page *chain_last)
 143{
 144#if 1
 145        struct page *tmp;
 146        tmp = page_chain_tail(chain_first, NULL);
 147        BUG_ON(tmp != chain_last);
 148#endif
 149
 150        /* add chain to head */
 151        set_page_private(chain_last, (unsigned long)*head);
 152        *head = chain_first;
 153}
 154
 155static struct page *__drbd_alloc_pages(struct drbd_device *device,
 156                                       unsigned int number)
 157{
 158        struct page *page = NULL;
 159        struct page *tmp = NULL;
 160        unsigned int i = 0;
 161
 162        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 163         * So what. It saves a spin_lock. */
 164        if (drbd_pp_vacant >= number) {
 165                spin_lock(&drbd_pp_lock);
 166                page = page_chain_del(&drbd_pp_pool, number);
 167                if (page)
 168                        drbd_pp_vacant -= number;
 169                spin_unlock(&drbd_pp_lock);
 170                if (page)
 171                        return page;
 172        }
 173
 174        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 175         * "criss-cross" setup, that might cause write-out on some other DRBD,
 176         * which in turn might block on the other node at this very place.  */
 177        for (i = 0; i < number; i++) {
 178                tmp = alloc_page(GFP_TRY);
 179                if (!tmp)
 180                        break;
 181                set_page_private(tmp, (unsigned long)page);
 182                page = tmp;
 183        }
 184
 185        if (i == number)
 186                return page;
 187
 188        /* Not enough pages immediately available this time.
 189         * No need to jump around here, drbd_alloc_pages will retry this
 190         * function "soon". */
 191        if (page) {
 192                tmp = page_chain_tail(page, NULL);
 193                spin_lock(&drbd_pp_lock);
 194                page_chain_add(&drbd_pp_pool, page, tmp);
 195                drbd_pp_vacant += i;
 196                spin_unlock(&drbd_pp_lock);
 197        }
 198        return NULL;
 199}
 200
 201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
 202                                           struct list_head *to_be_freed)
 203{
 204        struct drbd_peer_request *peer_req, *tmp;
 205
 206        /* The EEs are always appended to the end of the list. Since
 207           they are sent in order over the wire, they have to finish
 208           in order. As soon as we see the first not finished we can
 209           stop to examine the list... */
 210
 211        list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
 212                if (drbd_peer_req_has_active_page(peer_req))
 213                        break;
 214                list_move(&peer_req->w.list, to_be_freed);
 215        }
 216}
 217
 218static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
 219{
 220        LIST_HEAD(reclaimed);
 221        struct drbd_peer_request *peer_req, *t;
 222
 223        spin_lock_irq(&device->resource->req_lock);
 224        reclaim_finished_net_peer_reqs(device, &reclaimed);
 225        spin_unlock_irq(&device->resource->req_lock);
 226
 227        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 228                drbd_free_net_peer_req(device, peer_req);
 229}
 230
 231/**
 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 233 * @device:     DRBD device.
 234 * @number:     number of pages requested
 235 * @retry:      whether to retry, if not enough pages are available right now
 236 *
 237 * Tries to allocate number pages, first from our own page pool, then from
 238 * the kernel.
 239 * Possibly retry until DRBD frees sufficient pages somewhere else.
 240 *
 241 * If this allocation would exceed the max_buffers setting, we throttle
 242 * allocation (schedule_timeout) to give the system some room to breathe.
 243 *
 244 * We do not use max-buffers as hard limit, because it could lead to
 245 * congestion and further to a distributed deadlock during online-verify or
 246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 247 * resync-rate settings are mis-configured.
 248 *
 249 * Returns a page chain linked via page->private.
 250 */
 251struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
 252                              bool retry)
 253{
 254        struct drbd_device *device = peer_device->device;
 255        struct page *page = NULL;
 256        struct net_conf *nc;
 257        DEFINE_WAIT(wait);
 258        unsigned int mxb;
 259
 260        rcu_read_lock();
 261        nc = rcu_dereference(peer_device->connection->net_conf);
 262        mxb = nc ? nc->max_buffers : 1000000;
 263        rcu_read_unlock();
 264
 265        if (atomic_read(&device->pp_in_use) < mxb)
 266                page = __drbd_alloc_pages(device, number);
 267
 268        while (page == NULL) {
 269                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 270
 271                drbd_kick_lo_and_reclaim_net(device);
 272
 273                if (atomic_read(&device->pp_in_use) < mxb) {
 274                        page = __drbd_alloc_pages(device, number);
 275                        if (page)
 276                                break;
 277                }
 278
 279                if (!retry)
 280                        break;
 281
 282                if (signal_pending(current)) {
 283                        drbd_warn(device, "drbd_alloc_pages interrupted!\n");
 284                        break;
 285                }
 286
 287                if (schedule_timeout(HZ/10) == 0)
 288                        mxb = UINT_MAX;
 289        }
 290        finish_wait(&drbd_pp_wait, &wait);
 291
 292        if (page)
 293                atomic_add(number, &device->pp_in_use);
 294        return page;
 295}
 296
 297/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
 299 * Either links the page chain back to the global pool,
 300 * or returns all pages to the system. */
 301static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
 302{
 303        atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
 304        int i;
 305
 306        if (page == NULL)
 307                return;
 308
 309        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
 310                i = page_chain_free(page);
 311        else {
 312                struct page *tmp;
 313                tmp = page_chain_tail(page, &i);
 314                spin_lock(&drbd_pp_lock);
 315                page_chain_add(&drbd_pp_pool, page, tmp);
 316                drbd_pp_vacant += i;
 317                spin_unlock(&drbd_pp_lock);
 318        }
 319        i = atomic_sub_return(i, a);
 320        if (i < 0)
 321                drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
 322                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 323        wake_up(&drbd_pp_wait);
 324}
 325
 326/*
 327You need to hold the req_lock:
 328 _drbd_wait_ee_list_empty()
 329
 330You must not have the req_lock:
 331 drbd_free_peer_req()
 332 drbd_alloc_peer_req()
 333 drbd_free_peer_reqs()
 334 drbd_ee_fix_bhs()
 335 drbd_finish_peer_reqs()
 336 drbd_clear_done_ee()
 337 drbd_wait_ee_list_empty()
 338*/
 339
 340struct drbd_peer_request *
 341drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
 342                    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
 343{
 344        struct drbd_device *device = peer_device->device;
 345        struct drbd_peer_request *peer_req;
 346        struct page *page = NULL;
 347        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 348
 349        if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
 350                return NULL;
 351
 352        peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 353        if (!peer_req) {
 354                if (!(gfp_mask & __GFP_NOWARN))
 355                        drbd_err(device, "%s: allocation failed\n", __func__);
 356                return NULL;
 357        }
 358
 359        if (has_payload && data_size) {
 360                page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
 361                if (!page)
 362                        goto fail;
 363        }
 364
 365        memset(peer_req, 0, sizeof(*peer_req));
 366        INIT_LIST_HEAD(&peer_req->w.list);
 367        drbd_clear_interval(&peer_req->i);
 368        peer_req->i.size = data_size;
 369        peer_req->i.sector = sector;
 370        peer_req->submit_jif = jiffies;
 371        peer_req->peer_device = peer_device;
 372        peer_req->pages = page;
 373        /*
 374         * The block_id is opaque to the receiver.  It is not endianness
 375         * converted, and sent back to the sender unchanged.
 376         */
 377        peer_req->block_id = id;
 378
 379        return peer_req;
 380
 381 fail:
 382        mempool_free(peer_req, drbd_ee_mempool);
 383        return NULL;
 384}
 385
 386void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
 387                       int is_net)
 388{
 389        might_sleep();
 390        if (peer_req->flags & EE_HAS_DIGEST)
 391                kfree(peer_req->digest);
 392        drbd_free_pages(device, peer_req->pages, is_net);
 393        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
 394        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 395        if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
 396                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 397                drbd_al_complete_io(device, &peer_req->i);
 398        }
 399        mempool_free(peer_req, drbd_ee_mempool);
 400}
 401
 402int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
 403{
 404        LIST_HEAD(work_list);
 405        struct drbd_peer_request *peer_req, *t;
 406        int count = 0;
 407        int is_net = list == &device->net_ee;
 408
 409        spin_lock_irq(&device->resource->req_lock);
 410        list_splice_init(list, &work_list);
 411        spin_unlock_irq(&device->resource->req_lock);
 412
 413        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 414                __drbd_free_peer_req(device, peer_req, is_net);
 415                count++;
 416        }
 417        return count;
 418}
 419
 420/*
 421 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 422 */
 423static int drbd_finish_peer_reqs(struct drbd_device *device)
 424{
 425        LIST_HEAD(work_list);
 426        LIST_HEAD(reclaimed);
 427        struct drbd_peer_request *peer_req, *t;
 428        int err = 0;
 429
 430        spin_lock_irq(&device->resource->req_lock);
 431        reclaim_finished_net_peer_reqs(device, &reclaimed);
 432        list_splice_init(&device->done_ee, &work_list);
 433        spin_unlock_irq(&device->resource->req_lock);
 434
 435        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 436                drbd_free_net_peer_req(device, peer_req);
 437
 438        /* possible callbacks here:
 439         * e_end_block, and e_end_resync_block, e_send_superseded.
 440         * all ignore the last argument.
 441         */
 442        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 443                int err2;
 444
 445                /* list_del not necessary, next/prev members not touched */
 446                err2 = peer_req->w.cb(&peer_req->w, !!err);
 447                if (!err)
 448                        err = err2;
 449                drbd_free_peer_req(device, peer_req);
 450        }
 451        wake_up(&device->ee_wait);
 452
 453        return err;
 454}
 455
 456static void _drbd_wait_ee_list_empty(struct drbd_device *device,
 457                                     struct list_head *head)
 458{
 459        DEFINE_WAIT(wait);
 460
 461        /* avoids spin_lock/unlock
 462         * and calling prepare_to_wait in the fast path */
 463        while (!list_empty(head)) {
 464                prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 465                spin_unlock_irq(&device->resource->req_lock);
 466                io_schedule();
 467                finish_wait(&device->ee_wait, &wait);
 468                spin_lock_irq(&device->resource->req_lock);
 469        }
 470}
 471
 472static void drbd_wait_ee_list_empty(struct drbd_device *device,
 473                                    struct list_head *head)
 474{
 475        spin_lock_irq(&device->resource->req_lock);
 476        _drbd_wait_ee_list_empty(device, head);
 477        spin_unlock_irq(&device->resource->req_lock);
 478}
 479
 480static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 481{
 482        struct kvec iov = {
 483                .iov_base = buf,
 484                .iov_len = size,
 485        };
 486        struct msghdr msg = {
 487                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 488        };
 489        return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
 490}
 491
 492static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
 493{
 494        int rv;
 495
 496        rv = drbd_recv_short(connection->data.socket, buf, size, 0);
 497
 498        if (rv < 0) {
 499                if (rv == -ECONNRESET)
 500                        drbd_info(connection, "sock was reset by peer\n");
 501                else if (rv != -ERESTARTSYS)
 502                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
 503        } else if (rv == 0) {
 504                if (test_bit(DISCONNECT_SENT, &connection->flags)) {
 505                        long t;
 506                        rcu_read_lock();
 507                        t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
 508                        rcu_read_unlock();
 509
 510                        t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
 511
 512                        if (t)
 513                                goto out;
 514                }
 515                drbd_info(connection, "sock was shut down by peer\n");
 516        }
 517
 518        if (rv != size)
 519                conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
 520
 521out:
 522        return rv;
 523}
 524
 525static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
 526{
 527        int err;
 528
 529        err = drbd_recv(connection, buf, size);
 530        if (err != size) {
 531                if (err >= 0)
 532                        err = -EIO;
 533        } else
 534                err = 0;
 535        return err;
 536}
 537
 538static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
 539{
 540        int err;
 541
 542        err = drbd_recv_all(connection, buf, size);
 543        if (err && !signal_pending(current))
 544                drbd_warn(connection, "short read (expected size %d)\n", (int)size);
 545        return err;
 546}
 547
 548/* quoting tcp(7):
 549 *   On individual connections, the socket buffer size must be set prior to the
 550 *   listen(2) or connect(2) calls in order to have it take effect.
 551 * This is our wrapper to do so.
 552 */
 553static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 554                unsigned int rcv)
 555{
 556        /* open coded SO_SNDBUF, SO_RCVBUF */
 557        if (snd) {
 558                sock->sk->sk_sndbuf = snd;
 559                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 560        }
 561        if (rcv) {
 562                sock->sk->sk_rcvbuf = rcv;
 563                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 564        }
 565}
 566
 567static struct socket *drbd_try_connect(struct drbd_connection *connection)
 568{
 569        const char *what;
 570        struct socket *sock;
 571        struct sockaddr_in6 src_in6;
 572        struct sockaddr_in6 peer_in6;
 573        struct net_conf *nc;
 574        int err, peer_addr_len, my_addr_len;
 575        int sndbuf_size, rcvbuf_size, connect_int;
 576        int disconnect_on_error = 1;
 577
 578        rcu_read_lock();
 579        nc = rcu_dereference(connection->net_conf);
 580        if (!nc) {
 581                rcu_read_unlock();
 582                return NULL;
 583        }
 584        sndbuf_size = nc->sndbuf_size;
 585        rcvbuf_size = nc->rcvbuf_size;
 586        connect_int = nc->connect_int;
 587        rcu_read_unlock();
 588
 589        my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
 590        memcpy(&src_in6, &connection->my_addr, my_addr_len);
 591
 592        if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
 593                src_in6.sin6_port = 0;
 594        else
 595                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 596
 597        peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
 598        memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
 599
 600        what = "sock_create_kern";
 601        err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
 602                               SOCK_STREAM, IPPROTO_TCP, &sock);
 603        if (err < 0) {
 604                sock = NULL;
 605                goto out;
 606        }
 607
 608        sock->sk->sk_rcvtimeo =
 609        sock->sk->sk_sndtimeo = connect_int * HZ;
 610        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 611
 612       /* explicitly bind to the configured IP as source IP
 613        *  for the outgoing connections.
 614        *  This is needed for multihomed hosts and to be
 615        *  able to use lo: interfaces for drbd.
 616        * Make sure to use 0 as port number, so linux selects
 617        *  a free one dynamically.
 618        */
 619        what = "bind before connect";
 620        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 621        if (err < 0)
 622                goto out;
 623
 624        /* connect may fail, peer not yet available.
 625         * stay C_WF_CONNECTION, don't go Disconnecting! */
 626        disconnect_on_error = 0;
 627        what = "connect";
 628        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 629
 630out:
 631        if (err < 0) {
 632                if (sock) {
 633                        sock_release(sock);
 634                        sock = NULL;
 635                }
 636                switch (-err) {
 637                        /* timeout, busy, signal pending */
 638                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 639                case EINTR: case ERESTARTSYS:
 640                        /* peer not (yet) available, network problem */
 641                case ECONNREFUSED: case ENETUNREACH:
 642                case EHOSTDOWN:    case EHOSTUNREACH:
 643                        disconnect_on_error = 0;
 644                        break;
 645                default:
 646                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 647                }
 648                if (disconnect_on_error)
 649                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 650        }
 651
 652        return sock;
 653}
 654
 655struct accept_wait_data {
 656        struct drbd_connection *connection;
 657        struct socket *s_listen;
 658        struct completion door_bell;
 659        void (*original_sk_state_change)(struct sock *sk);
 660
 661};
 662
 663static void drbd_incoming_connection(struct sock *sk)
 664{
 665        struct accept_wait_data *ad = sk->sk_user_data;
 666        void (*state_change)(struct sock *sk);
 667
 668        state_change = ad->original_sk_state_change;
 669        if (sk->sk_state == TCP_ESTABLISHED)
 670                complete(&ad->door_bell);
 671        state_change(sk);
 672}
 673
 674static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
 675{
 676        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 677        struct sockaddr_in6 my_addr;
 678        struct socket *s_listen;
 679        struct net_conf *nc;
 680        const char *what;
 681
 682        rcu_read_lock();
 683        nc = rcu_dereference(connection->net_conf);
 684        if (!nc) {
 685                rcu_read_unlock();
 686                return -EIO;
 687        }
 688        sndbuf_size = nc->sndbuf_size;
 689        rcvbuf_size = nc->rcvbuf_size;
 690        rcu_read_unlock();
 691
 692        my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
 693        memcpy(&my_addr, &connection->my_addr, my_addr_len);
 694
 695        what = "sock_create_kern";
 696        err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
 697                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 698        if (err) {
 699                s_listen = NULL;
 700                goto out;
 701        }
 702
 703        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 704        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 705
 706        what = "bind before listen";
 707        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 708        if (err < 0)
 709                goto out;
 710
 711        ad->s_listen = s_listen;
 712        write_lock_bh(&s_listen->sk->sk_callback_lock);
 713        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 714        s_listen->sk->sk_state_change = drbd_incoming_connection;
 715        s_listen->sk->sk_user_data = ad;
 716        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 717
 718        what = "listen";
 719        err = s_listen->ops->listen(s_listen, 5);
 720        if (err < 0)
 721                goto out;
 722
 723        return 0;
 724out:
 725        if (s_listen)
 726                sock_release(s_listen);
 727        if (err < 0) {
 728                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 729                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 730                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 731                }
 732        }
 733
 734        return -EIO;
 735}
 736
 737static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 738{
 739        write_lock_bh(&sk->sk_callback_lock);
 740        sk->sk_state_change = ad->original_sk_state_change;
 741        sk->sk_user_data = NULL;
 742        write_unlock_bh(&sk->sk_callback_lock);
 743}
 744
 745static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
 746{
 747        int timeo, connect_int, err = 0;
 748        struct socket *s_estab = NULL;
 749        struct net_conf *nc;
 750
 751        rcu_read_lock();
 752        nc = rcu_dereference(connection->net_conf);
 753        if (!nc) {
 754                rcu_read_unlock();
 755                return NULL;
 756        }
 757        connect_int = nc->connect_int;
 758        rcu_read_unlock();
 759
 760        timeo = connect_int * HZ;
 761        /* 28.5% random jitter */
 762        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 763
 764        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 765        if (err <= 0)
 766                return NULL;
 767
 768        err = kernel_accept(ad->s_listen, &s_estab, 0);
 769        if (err < 0) {
 770                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 771                        drbd_err(connection, "accept failed, err = %d\n", err);
 772                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 773                }
 774        }
 775
 776        if (s_estab)
 777                unregister_state_change(s_estab->sk, ad);
 778
 779        return s_estab;
 780}
 781
 782static int decode_header(struct drbd_connection *, void *, struct packet_info *);
 783
 784static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
 785                             enum drbd_packet cmd)
 786{
 787        if (!conn_prepare_command(connection, sock))
 788                return -EIO;
 789        return conn_send_command(connection, sock, cmd, 0, NULL, 0);
 790}
 791
 792static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
 793{
 794        unsigned int header_size = drbd_header_size(connection);
 795        struct packet_info pi;
 796        struct net_conf *nc;
 797        int err;
 798
 799        rcu_read_lock();
 800        nc = rcu_dereference(connection->net_conf);
 801        if (!nc) {
 802                rcu_read_unlock();
 803                return -EIO;
 804        }
 805        sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
 806        rcu_read_unlock();
 807
 808        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
 809        if (err != header_size) {
 810                if (err >= 0)
 811                        err = -EIO;
 812                return err;
 813        }
 814        err = decode_header(connection, connection->data.rbuf, &pi);
 815        if (err)
 816                return err;
 817        return pi.cmd;
 818}
 819
 820/**
 821 * drbd_socket_okay() - Free the socket if its connection is not okay
 822 * @sock:       pointer to the pointer to the socket.
 823 */
 824static bool drbd_socket_okay(struct socket **sock)
 825{
 826        int rr;
 827        char tb[4];
 828
 829        if (!*sock)
 830                return false;
 831
 832        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 833
 834        if (rr > 0 || rr == -EAGAIN) {
 835                return true;
 836        } else {
 837                sock_release(*sock);
 838                *sock = NULL;
 839                return false;
 840        }
 841}
 842
 843static bool connection_established(struct drbd_connection *connection,
 844                                   struct socket **sock1,
 845                                   struct socket **sock2)
 846{
 847        struct net_conf *nc;
 848        int timeout;
 849        bool ok;
 850
 851        if (!*sock1 || !*sock2)
 852                return false;
 853
 854        rcu_read_lock();
 855        nc = rcu_dereference(connection->net_conf);
 856        timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
 857        rcu_read_unlock();
 858        schedule_timeout_interruptible(timeout);
 859
 860        ok = drbd_socket_okay(sock1);
 861        ok = drbd_socket_okay(sock2) && ok;
 862
 863        return ok;
 864}
 865
 866/* Gets called if a connection is established, or if a new minor gets created
 867   in a connection */
 868int drbd_connected(struct drbd_peer_device *peer_device)
 869{
 870        struct drbd_device *device = peer_device->device;
 871        int err;
 872
 873        atomic_set(&device->packet_seq, 0);
 874        device->peer_seq = 0;
 875
 876        device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
 877                &peer_device->connection->cstate_mutex :
 878                &device->own_state_mutex;
 879
 880        err = drbd_send_sync_param(peer_device);
 881        if (!err)
 882                err = drbd_send_sizes(peer_device, 0, 0);
 883        if (!err)
 884                err = drbd_send_uuids(peer_device);
 885        if (!err)
 886                err = drbd_send_current_state(peer_device);
 887        clear_bit(USE_DEGR_WFC_T, &device->flags);
 888        clear_bit(RESIZE_PENDING, &device->flags);
 889        atomic_set(&device->ap_in_flight, 0);
 890        mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
 891        return err;
 892}
 893
 894/*
 895 * return values:
 896 *   1 yes, we have a valid connection
 897 *   0 oops, did not work out, please try again
 898 *  -1 peer talks different language,
 899 *     no point in trying again, please go standalone.
 900 *  -2 We do not have a network config...
 901 */
 902static int conn_connect(struct drbd_connection *connection)
 903{
 904        struct drbd_socket sock, msock;
 905        struct drbd_peer_device *peer_device;
 906        struct net_conf *nc;
 907        int vnr, timeout, h;
 908        bool discard_my_data, ok;
 909        enum drbd_state_rv rv;
 910        struct accept_wait_data ad = {
 911                .connection = connection,
 912                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 913        };
 914
 915        clear_bit(DISCONNECT_SENT, &connection->flags);
 916        if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 917                return -2;
 918
 919        mutex_init(&sock.mutex);
 920        sock.sbuf = connection->data.sbuf;
 921        sock.rbuf = connection->data.rbuf;
 922        sock.socket = NULL;
 923        mutex_init(&msock.mutex);
 924        msock.sbuf = connection->meta.sbuf;
 925        msock.rbuf = connection->meta.rbuf;
 926        msock.socket = NULL;
 927
 928        /* Assume that the peer only understands protocol 80 until we know better.  */
 929        connection->agreed_pro_version = 80;
 930
 931        if (prepare_listen_socket(connection, &ad))
 932                return 0;
 933
 934        do {
 935                struct socket *s;
 936
 937                s = drbd_try_connect(connection);
 938                if (s) {
 939                        if (!sock.socket) {
 940                                sock.socket = s;
 941                                send_first_packet(connection, &sock, P_INITIAL_DATA);
 942                        } else if (!msock.socket) {
 943                                clear_bit(RESOLVE_CONFLICTS, &connection->flags);
 944                                msock.socket = s;
 945                                send_first_packet(connection, &msock, P_INITIAL_META);
 946                        } else {
 947                                drbd_err(connection, "Logic error in conn_connect()\n");
 948                                goto out_release_sockets;
 949                        }
 950                }
 951
 952                if (connection_established(connection, &sock.socket, &msock.socket))
 953                        break;
 954
 955retry:
 956                s = drbd_wait_for_connect(connection, &ad);
 957                if (s) {
 958                        int fp = receive_first_packet(connection, s);
 959                        drbd_socket_okay(&sock.socket);
 960                        drbd_socket_okay(&msock.socket);
 961                        switch (fp) {
 962                        case P_INITIAL_DATA:
 963                                if (sock.socket) {
 964                                        drbd_warn(connection, "initial packet S crossed\n");
 965                                        sock_release(sock.socket);
 966                                        sock.socket = s;
 967                                        goto randomize;
 968                                }
 969                                sock.socket = s;
 970                                break;
 971                        case P_INITIAL_META:
 972                                set_bit(RESOLVE_CONFLICTS, &connection->flags);
 973                                if (msock.socket) {
 974                                        drbd_warn(connection, "initial packet M crossed\n");
 975                                        sock_release(msock.socket);
 976                                        msock.socket = s;
 977                                        goto randomize;
 978                                }
 979                                msock.socket = s;
 980                                break;
 981                        default:
 982                                drbd_warn(connection, "Error receiving initial packet\n");
 983                                sock_release(s);
 984randomize:
 985                                if (prandom_u32() & 1)
 986                                        goto retry;
 987                        }
 988                }
 989
 990                if (connection->cstate <= C_DISCONNECTING)
 991                        goto out_release_sockets;
 992                if (signal_pending(current)) {
 993                        flush_signals(current);
 994                        smp_rmb();
 995                        if (get_t_state(&connection->receiver) == EXITING)
 996                                goto out_release_sockets;
 997                }
 998
 999                ok = connection_established(connection, &sock.socket, &msock.socket);
1000        } while (!ok);
1001
1002        if (ad.s_listen)
1003                sock_release(ad.s_listen);
1004
1005        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1006        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007
1008        sock.socket->sk->sk_allocation = GFP_NOIO;
1009        msock.socket->sk->sk_allocation = GFP_NOIO;
1010
1011        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1012        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1013
1014        /* NOT YET ...
1015         * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1016         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1017         * first set it to the P_CONNECTION_FEATURES timeout,
1018         * which we set to 4x the configured ping_timeout. */
1019        rcu_read_lock();
1020        nc = rcu_dereference(connection->net_conf);
1021
1022        sock.socket->sk->sk_sndtimeo =
1023        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1024
1025        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1026        timeout = nc->timeout * HZ / 10;
1027        discard_my_data = nc->discard_my_data;
1028        rcu_read_unlock();
1029
1030        msock.socket->sk->sk_sndtimeo = timeout;
1031
1032        /* we don't want delays.
1033         * we use TCP_CORK where appropriate, though */
1034        drbd_tcp_nodelay(sock.socket);
1035        drbd_tcp_nodelay(msock.socket);
1036
1037        connection->data.socket = sock.socket;
1038        connection->meta.socket = msock.socket;
1039        connection->last_received = jiffies;
1040
1041        h = drbd_do_features(connection);
1042        if (h <= 0)
1043                return h;
1044
1045        if (connection->cram_hmac_tfm) {
1046                /* drbd_request_state(device, NS(conn, WFAuth)); */
1047                switch (drbd_do_auth(connection)) {
1048                case -1:
1049                        drbd_err(connection, "Authentication of peer failed\n");
1050                        return -1;
1051                case 0:
1052                        drbd_err(connection, "Authentication of peer failed, trying again.\n");
1053                        return 0;
1054                }
1055        }
1056
1057        connection->data.socket->sk->sk_sndtimeo = timeout;
1058        connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1059
1060        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1061                return -1;
1062
1063        /* Prevent a race between resync-handshake and
1064         * being promoted to Primary.
1065         *
1066         * Grab and release the state mutex, so we know that any current
1067         * drbd_set_role() is finished, and any incoming drbd_set_role
1068         * will see the STATE_SENT flag, and wait for it to be cleared.
1069         */
1070        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1071                mutex_lock(peer_device->device->state_mutex);
1072
1073        set_bit(STATE_SENT, &connection->flags);
1074
1075        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1076                mutex_unlock(peer_device->device->state_mutex);
1077
1078        rcu_read_lock();
1079        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1080                struct drbd_device *device = peer_device->device;
1081                kref_get(&device->kref);
1082                rcu_read_unlock();
1083
1084                if (discard_my_data)
1085                        set_bit(DISCARD_MY_DATA, &device->flags);
1086                else
1087                        clear_bit(DISCARD_MY_DATA, &device->flags);
1088
1089                drbd_connected(peer_device);
1090                kref_put(&device->kref, drbd_destroy_device);
1091                rcu_read_lock();
1092        }
1093        rcu_read_unlock();
1094
1095        rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1096        if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1097                clear_bit(STATE_SENT, &connection->flags);
1098                return 0;
1099        }
1100
1101        drbd_thread_start(&connection->asender);
1102
1103        mutex_lock(&connection->resource->conf_update);
1104        /* The discard_my_data flag is a single-shot modifier to the next
1105         * connection attempt, the handshake of which is now well underway.
1106         * No need for rcu style copying of the whole struct
1107         * just to clear a single value. */
1108        connection->net_conf->discard_my_data = 0;
1109        mutex_unlock(&connection->resource->conf_update);
1110
1111        return h;
1112
1113out_release_sockets:
1114        if (ad.s_listen)
1115                sock_release(ad.s_listen);
1116        if (sock.socket)
1117                sock_release(sock.socket);
1118        if (msock.socket)
1119                sock_release(msock.socket);
1120        return -1;
1121}
1122
1123static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1124{
1125        unsigned int header_size = drbd_header_size(connection);
1126
1127        if (header_size == sizeof(struct p_header100) &&
1128            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1129                struct p_header100 *h = header;
1130                if (h->pad != 0) {
1131                        drbd_err(connection, "Header padding is not zero\n");
1132                        return -EINVAL;
1133                }
1134                pi->vnr = be16_to_cpu(h->volume);
1135                pi->cmd = be16_to_cpu(h->command);
1136                pi->size = be32_to_cpu(h->length);
1137        } else if (header_size == sizeof(struct p_header95) &&
1138                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1139                struct p_header95 *h = header;
1140                pi->cmd = be16_to_cpu(h->command);
1141                pi->size = be32_to_cpu(h->length);
1142                pi->vnr = 0;
1143        } else if (header_size == sizeof(struct p_header80) &&
1144                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1145                struct p_header80 *h = header;
1146                pi->cmd = be16_to_cpu(h->command);
1147                pi->size = be16_to_cpu(h->length);
1148                pi->vnr = 0;
1149        } else {
1150                drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1151                         be32_to_cpu(*(__be32 *)header),
1152                         connection->agreed_pro_version);
1153                return -EINVAL;
1154        }
1155        pi->data = header + header_size;
1156        return 0;
1157}
1158
1159static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1160{
1161        void *buffer = connection->data.rbuf;
1162        int err;
1163
1164        err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1165        if (err)
1166                return err;
1167
1168        err = decode_header(connection, buffer, pi);
1169        connection->last_received = jiffies;
1170
1171        return err;
1172}
1173
1174static void drbd_flush(struct drbd_connection *connection)
1175{
1176        int rv;
1177        struct drbd_peer_device *peer_device;
1178        int vnr;
1179
1180        if (connection->resource->write_ordering >= WO_bdev_flush) {
1181                rcu_read_lock();
1182                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1183                        struct drbd_device *device = peer_device->device;
1184
1185                        if (!get_ldev(device))
1186                                continue;
1187                        kref_get(&device->kref);
1188                        rcu_read_unlock();
1189
1190                        /* Right now, we have only this one synchronous code path
1191                         * for flushes between request epochs.
1192                         * We may want to make those asynchronous,
1193                         * or at least parallelize the flushes to the volume devices.
1194                         */
1195                        device->flush_jif = jiffies;
1196                        set_bit(FLUSH_PENDING, &device->flags);
1197                        rv = blkdev_issue_flush(device->ldev->backing_bdev,
1198                                        GFP_NOIO, NULL);
1199                        clear_bit(FLUSH_PENDING, &device->flags);
1200                        if (rv) {
1201                                drbd_info(device, "local disk flush failed with status %d\n", rv);
1202                                /* would rather check on EOPNOTSUPP, but that is not reliable.
1203                                 * don't try again for ANY return value != 0
1204                                 * if (rv == -EOPNOTSUPP) */
1205                                drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1206                        }
1207                        put_ldev(device);
1208                        kref_put(&device->kref, drbd_destroy_device);
1209
1210                        rcu_read_lock();
1211                        if (rv)
1212                                break;
1213                }
1214                rcu_read_unlock();
1215        }
1216}
1217
1218/**
1219 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1220 * @device:     DRBD device.
1221 * @epoch:      Epoch object.
1222 * @ev:         Epoch event.
1223 */
1224static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1225                                               struct drbd_epoch *epoch,
1226                                               enum epoch_event ev)
1227{
1228        int epoch_size;
1229        struct drbd_epoch *next_epoch;
1230        enum finish_epoch rv = FE_STILL_LIVE;
1231
1232        spin_lock(&connection->epoch_lock);
1233        do {
1234                next_epoch = NULL;
1235
1236                epoch_size = atomic_read(&epoch->epoch_size);
1237
1238                switch (ev & ~EV_CLEANUP) {
1239                case EV_PUT:
1240                        atomic_dec(&epoch->active);
1241                        break;
1242                case EV_GOT_BARRIER_NR:
1243                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1244                        break;
1245                case EV_BECAME_LAST:
1246                        /* nothing to do*/
1247                        break;
1248                }
1249
1250                if (epoch_size != 0 &&
1251                    atomic_read(&epoch->active) == 0 &&
1252                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1253                        if (!(ev & EV_CLEANUP)) {
1254                                spin_unlock(&connection->epoch_lock);
1255                                drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1256                                spin_lock(&connection->epoch_lock);
1257                        }
1258#if 0
1259                        /* FIXME: dec unacked on connection, once we have
1260                         * something to count pending connection packets in. */
1261                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1262                                dec_unacked(epoch->connection);
1263#endif
1264
1265                        if (connection->current_epoch != epoch) {
1266                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1267                                list_del(&epoch->list);
1268                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1269                                connection->epochs--;
1270                                kfree(epoch);
1271
1272                                if (rv == FE_STILL_LIVE)
1273                                        rv = FE_DESTROYED;
1274                        } else {
1275                                epoch->flags = 0;
1276                                atomic_set(&epoch->epoch_size, 0);
1277                                /* atomic_set(&epoch->active, 0); is already zero */
1278                                if (rv == FE_STILL_LIVE)
1279                                        rv = FE_RECYCLED;
1280                        }
1281                }
1282
1283                if (!next_epoch)
1284                        break;
1285
1286                epoch = next_epoch;
1287        } while (1);
1288
1289        spin_unlock(&connection->epoch_lock);
1290
1291        return rv;
1292}
1293
1294static enum write_ordering_e
1295max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1296{
1297        struct disk_conf *dc;
1298
1299        dc = rcu_dereference(bdev->disk_conf);
1300
1301        if (wo == WO_bdev_flush && !dc->disk_flushes)
1302                wo = WO_drain_io;
1303        if (wo == WO_drain_io && !dc->disk_drain)
1304                wo = WO_none;
1305
1306        return wo;
1307}
1308
1309/**
1310 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1311 * @connection: DRBD connection.
1312 * @wo:         Write ordering method to try.
1313 */
1314void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1315                              enum write_ordering_e wo)
1316{
1317        struct drbd_device *device;
1318        enum write_ordering_e pwo;
1319        int vnr;
1320        static char *write_ordering_str[] = {
1321                [WO_none] = "none",
1322                [WO_drain_io] = "drain",
1323                [WO_bdev_flush] = "flush",
1324        };
1325
1326        pwo = resource->write_ordering;
1327        if (wo != WO_bdev_flush)
1328                wo = min(pwo, wo);
1329        rcu_read_lock();
1330        idr_for_each_entry(&resource->devices, device, vnr) {
1331                if (get_ldev(device)) {
1332                        wo = max_allowed_wo(device->ldev, wo);
1333                        if (device->ldev == bdev)
1334                                bdev = NULL;
1335                        put_ldev(device);
1336                }
1337        }
1338
1339        if (bdev)
1340                wo = max_allowed_wo(bdev, wo);
1341
1342        rcu_read_unlock();
1343
1344        resource->write_ordering = wo;
1345        if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1346                drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1347}
1348
1349/**
1350 * drbd_submit_peer_request()
1351 * @device:     DRBD device.
1352 * @peer_req:   peer request
1353 * @rw:         flag field, see bio->bi_rw
1354 *
1355 * May spread the pages to multiple bios,
1356 * depending on bio_add_page restrictions.
1357 *
1358 * Returns 0 if all bios have been submitted,
1359 * -ENOMEM if we could not allocate enough bios,
1360 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1361 *  single page to an empty bio (which should never happen and likely indicates
1362 *  that the lower level IO stack is in some way broken). This has been observed
1363 *  on certain Xen deployments.
1364 */
1365/* TODO allocate from our own bio_set. */
1366int drbd_submit_peer_request(struct drbd_device *device,
1367                             struct drbd_peer_request *peer_req,
1368                             const unsigned rw, const int fault_type)
1369{
1370        struct bio *bios = NULL;
1371        struct bio *bio;
1372        struct page *page = peer_req->pages;
1373        sector_t sector = peer_req->i.sector;
1374        unsigned data_size = peer_req->i.size;
1375        unsigned n_bios = 0;
1376        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1377        int err = -ENOMEM;
1378
1379        if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1380                /* wait for all pending IO completions, before we start
1381                 * zeroing things out. */
1382                conn_wait_active_ee_empty(first_peer_device(device)->connection);
1383                /* add it to the active list now,
1384                 * so we can find it to present it in debugfs */
1385                peer_req->submit_jif = jiffies;
1386                peer_req->flags |= EE_SUBMITTED;
1387                spin_lock_irq(&device->resource->req_lock);
1388                list_add_tail(&peer_req->w.list, &device->active_ee);
1389                spin_unlock_irq(&device->resource->req_lock);
1390                if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1391                        sector, data_size >> 9, GFP_NOIO, false))
1392                        peer_req->flags |= EE_WAS_ERROR;
1393                drbd_endio_write_sec_final(peer_req);
1394                return 0;
1395        }
1396
1397        /* Discards don't have any payload.
1398         * But the scsi layer still expects a bio_vec it can use internally,
1399         * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1400        if (peer_req->flags & EE_IS_TRIM)
1401                nr_pages = 1;
1402
1403        /* In most cases, we will only need one bio.  But in case the lower
1404         * level restrictions happen to be different at this offset on this
1405         * side than those of the sending peer, we may need to submit the
1406         * request in more than one bio.
1407         *
1408         * Plain bio_alloc is good enough here, this is no DRBD internally
1409         * generated bio, but a bio allocated on behalf of the peer.
1410         */
1411next_bio:
1412        bio = bio_alloc(GFP_NOIO, nr_pages);
1413        if (!bio) {
1414                drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1415                goto fail;
1416        }
1417        /* > peer_req->i.sector, unless this is the first bio */
1418        bio->bi_iter.bi_sector = sector;
1419        bio->bi_bdev = device->ldev->backing_bdev;
1420        bio->bi_rw = rw;
1421        bio->bi_private = peer_req;
1422        bio->bi_end_io = drbd_peer_request_endio;
1423
1424        bio->bi_next = bios;
1425        bios = bio;
1426        ++n_bios;
1427
1428        if (rw & REQ_DISCARD) {
1429                bio->bi_iter.bi_size = data_size;
1430                goto submit;
1431        }
1432
1433        page_chain_for_each(page) {
1434                unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1435                if (!bio_add_page(bio, page, len, 0)) {
1436                        /* A single page must always be possible!
1437                         * But in case it fails anyways,
1438                         * we deal with it, and complain (below). */
1439                        if (bio->bi_vcnt == 0) {
1440                                drbd_err(device,
1441                                        "bio_add_page failed for len=%u, "
1442                                        "bi_vcnt=0 (bi_sector=%llu)\n",
1443                                        len, (uint64_t)bio->bi_iter.bi_sector);
1444                                err = -ENOSPC;
1445                                goto fail;
1446                        }
1447                        goto next_bio;
1448                }
1449                data_size -= len;
1450                sector += len >> 9;
1451                --nr_pages;
1452        }
1453        D_ASSERT(device, data_size == 0);
1454submit:
1455        D_ASSERT(device, page == NULL);
1456
1457        atomic_set(&peer_req->pending_bios, n_bios);
1458        /* for debugfs: update timestamp, mark as submitted */
1459        peer_req->submit_jif = jiffies;
1460        peer_req->flags |= EE_SUBMITTED;
1461        do {
1462                bio = bios;
1463                bios = bios->bi_next;
1464                bio->bi_next = NULL;
1465
1466                drbd_generic_make_request(device, fault_type, bio);
1467        } while (bios);
1468        return 0;
1469
1470fail:
1471        while (bios) {
1472                bio = bios;
1473                bios = bios->bi_next;
1474                bio_put(bio);
1475        }
1476        return err;
1477}
1478
1479static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1480                                             struct drbd_peer_request *peer_req)
1481{
1482        struct drbd_interval *i = &peer_req->i;
1483
1484        drbd_remove_interval(&device->write_requests, i);
1485        drbd_clear_interval(i);
1486
1487        /* Wake up any processes waiting for this peer request to complete.  */
1488        if (i->waiting)
1489                wake_up(&device->misc_wait);
1490}
1491
1492static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1493{
1494        struct drbd_peer_device *peer_device;
1495        int vnr;
1496
1497        rcu_read_lock();
1498        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1499                struct drbd_device *device = peer_device->device;
1500
1501                kref_get(&device->kref);
1502                rcu_read_unlock();
1503                drbd_wait_ee_list_empty(device, &device->active_ee);
1504                kref_put(&device->kref, drbd_destroy_device);
1505                rcu_read_lock();
1506        }
1507        rcu_read_unlock();
1508}
1509
1510static struct drbd_peer_device *
1511conn_peer_device(struct drbd_connection *connection, int volume_number)
1512{
1513        return idr_find(&connection->peer_devices, volume_number);
1514}
1515
1516static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1517{
1518        int rv;
1519        struct p_barrier *p = pi->data;
1520        struct drbd_epoch *epoch;
1521
1522        /* FIXME these are unacked on connection,
1523         * not a specific (peer)device.
1524         */
1525        connection->current_epoch->barrier_nr = p->barrier;
1526        connection->current_epoch->connection = connection;
1527        rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1528
1529        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1530         * the activity log, which means it would not be resynced in case the
1531         * R_PRIMARY crashes now.
1532         * Therefore we must send the barrier_ack after the barrier request was
1533         * completed. */
1534        switch (connection->resource->write_ordering) {
1535        case WO_none:
1536                if (rv == FE_RECYCLED)
1537                        return 0;
1538
1539                /* receiver context, in the writeout path of the other node.
1540                 * avoid potential distributed deadlock */
1541                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1542                if (epoch)
1543                        break;
1544                else
1545                        drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1546                        /* Fall through */
1547
1548        case WO_bdev_flush:
1549        case WO_drain_io:
1550                conn_wait_active_ee_empty(connection);
1551                drbd_flush(connection);
1552
1553                if (atomic_read(&connection->current_epoch->epoch_size)) {
1554                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1555                        if (epoch)
1556                                break;
1557                }
1558
1559                return 0;
1560        default:
1561                drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1562                         connection->resource->write_ordering);
1563                return -EIO;
1564        }
1565
1566        epoch->flags = 0;
1567        atomic_set(&epoch->epoch_size, 0);
1568        atomic_set(&epoch->active, 0);
1569
1570        spin_lock(&connection->epoch_lock);
1571        if (atomic_read(&connection->current_epoch->epoch_size)) {
1572                list_add(&epoch->list, &connection->current_epoch->list);
1573                connection->current_epoch = epoch;
1574                connection->epochs++;
1575        } else {
1576                /* The current_epoch got recycled while we allocated this one... */
1577                kfree(epoch);
1578        }
1579        spin_unlock(&connection->epoch_lock);
1580
1581        return 0;
1582}
1583
1584/* used from receive_RSDataReply (recv_resync_read)
1585 * and from receive_Data */
1586static struct drbd_peer_request *
1587read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1588              struct packet_info *pi) __must_hold(local)
1589{
1590        struct drbd_device *device = peer_device->device;
1591        const sector_t capacity = drbd_get_capacity(device->this_bdev);
1592        struct drbd_peer_request *peer_req;
1593        struct page *page;
1594        int digest_size, err;
1595        unsigned int data_size = pi->size, ds;
1596        void *dig_in = peer_device->connection->int_dig_in;
1597        void *dig_vv = peer_device->connection->int_dig_vv;
1598        unsigned long *data;
1599        struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1600
1601        digest_size = 0;
1602        if (!trim && peer_device->connection->peer_integrity_tfm) {
1603                digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1604                /*
1605                 * FIXME: Receive the incoming digest into the receive buffer
1606                 *        here, together with its struct p_data?
1607                 */
1608                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1609                if (err)
1610                        return NULL;
1611                data_size -= digest_size;
1612        }
1613
1614        if (trim) {
1615                D_ASSERT(peer_device, data_size == 0);
1616                data_size = be32_to_cpu(trim->size);
1617        }
1618
1619        if (!expect(IS_ALIGNED(data_size, 512)))
1620                return NULL;
1621        /* prepare for larger trim requests. */
1622        if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1623                return NULL;
1624
1625        /* even though we trust out peer,
1626         * we sometimes have to double check. */
1627        if (sector + (data_size>>9) > capacity) {
1628                drbd_err(device, "request from peer beyond end of local disk: "
1629                        "capacity: %llus < sector: %llus + size: %u\n",
1630                        (unsigned long long)capacity,
1631                        (unsigned long long)sector, data_size);
1632                return NULL;
1633        }
1634
1635        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1636         * "criss-cross" setup, that might cause write-out on some other DRBD,
1637         * which in turn might block on the other node at this very place.  */
1638        peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1639        if (!peer_req)
1640                return NULL;
1641
1642        peer_req->flags |= EE_WRITE;
1643        if (trim)
1644                return peer_req;
1645
1646        ds = data_size;
1647        page = peer_req->pages;
1648        page_chain_for_each(page) {
1649                unsigned len = min_t(int, ds, PAGE_SIZE);
1650                data = kmap(page);
1651                err = drbd_recv_all_warn(peer_device->connection, data, len);
1652                if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1653                        drbd_err(device, "Fault injection: Corrupting data on receive\n");
1654                        data[0] = data[0] ^ (unsigned long)-1;
1655                }
1656                kunmap(page);
1657                if (err) {
1658                        drbd_free_peer_req(device, peer_req);
1659                        return NULL;
1660                }
1661                ds -= len;
1662        }
1663
1664        if (digest_size) {
1665                drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1666                if (memcmp(dig_in, dig_vv, digest_size)) {
1667                        drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1668                                (unsigned long long)sector, data_size);
1669                        drbd_free_peer_req(device, peer_req);
1670                        return NULL;
1671                }
1672        }
1673        device->recv_cnt += data_size >> 9;
1674        return peer_req;
1675}
1676
1677/* drbd_drain_block() just takes a data block
1678 * out of the socket input buffer, and discards it.
1679 */
1680static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1681{
1682        struct page *page;
1683        int err = 0;
1684        void *data;
1685
1686        if (!data_size)
1687                return 0;
1688
1689        page = drbd_alloc_pages(peer_device, 1, 1);
1690
1691        data = kmap(page);
1692        while (data_size) {
1693                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1694
1695                err = drbd_recv_all_warn(peer_device->connection, data, len);
1696                if (err)
1697                        break;
1698                data_size -= len;
1699        }
1700        kunmap(page);
1701        drbd_free_pages(peer_device->device, page, 0);
1702        return err;
1703}
1704
1705static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1706                           sector_t sector, int data_size)
1707{
1708        struct bio_vec bvec;
1709        struct bvec_iter iter;
1710        struct bio *bio;
1711        int digest_size, err, expect;
1712        void *dig_in = peer_device->connection->int_dig_in;
1713        void *dig_vv = peer_device->connection->int_dig_vv;
1714
1715        digest_size = 0;
1716        if (peer_device->connection->peer_integrity_tfm) {
1717                digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1718                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1719                if (err)
1720                        return err;
1721                data_size -= digest_size;
1722        }
1723
1724        /* optimistically update recv_cnt.  if receiving fails below,
1725         * we disconnect anyways, and counters will be reset. */
1726        peer_device->device->recv_cnt += data_size>>9;
1727
1728        bio = req->master_bio;
1729        D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1730
1731        bio_for_each_segment(bvec, bio, iter) {
1732                void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1733                expect = min_t(int, data_size, bvec.bv_len);
1734                err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1735                kunmap(bvec.bv_page);
1736                if (err)
1737                        return err;
1738                data_size -= expect;
1739        }
1740
1741        if (digest_size) {
1742                drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1743                if (memcmp(dig_in, dig_vv, digest_size)) {
1744                        drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1745                        return -EINVAL;
1746                }
1747        }
1748
1749        D_ASSERT(peer_device->device, data_size == 0);
1750        return 0;
1751}
1752
1753/*
1754 * e_end_resync_block() is called in asender context via
1755 * drbd_finish_peer_reqs().
1756 */
1757static int e_end_resync_block(struct drbd_work *w, int unused)
1758{
1759        struct drbd_peer_request *peer_req =
1760                container_of(w, struct drbd_peer_request, w);
1761        struct drbd_peer_device *peer_device = peer_req->peer_device;
1762        struct drbd_device *device = peer_device->device;
1763        sector_t sector = peer_req->i.sector;
1764        int err;
1765
1766        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1767
1768        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1769                drbd_set_in_sync(device, sector, peer_req->i.size);
1770                err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1771        } else {
1772                /* Record failure to sync */
1773                drbd_rs_failed_io(device, sector, peer_req->i.size);
1774
1775                err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1776        }
1777        dec_unacked(device);
1778
1779        return err;
1780}
1781
1782static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1783                            struct packet_info *pi) __releases(local)
1784{
1785        struct drbd_device *device = peer_device->device;
1786        struct drbd_peer_request *peer_req;
1787
1788        peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1789        if (!peer_req)
1790                goto fail;
1791
1792        dec_rs_pending(device);
1793
1794        inc_unacked(device);
1795        /* corresponding dec_unacked() in e_end_resync_block()
1796         * respective _drbd_clear_done_ee */
1797
1798        peer_req->w.cb = e_end_resync_block;
1799        peer_req->submit_jif = jiffies;
1800
1801        spin_lock_irq(&device->resource->req_lock);
1802        list_add_tail(&peer_req->w.list, &device->sync_ee);
1803        spin_unlock_irq(&device->resource->req_lock);
1804
1805        atomic_add(pi->size >> 9, &device->rs_sect_ev);
1806        if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1807                return 0;
1808
1809        /* don't care for the reason here */
1810        drbd_err(device, "submit failed, triggering re-connect\n");
1811        spin_lock_irq(&device->resource->req_lock);
1812        list_del(&peer_req->w.list);
1813        spin_unlock_irq(&device->resource->req_lock);
1814
1815        drbd_free_peer_req(device, peer_req);
1816fail:
1817        put_ldev(device);
1818        return -EIO;
1819}
1820
1821static struct drbd_request *
1822find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1823             sector_t sector, bool missing_ok, const char *func)
1824{
1825        struct drbd_request *req;
1826
1827        /* Request object according to our peer */
1828        req = (struct drbd_request *)(unsigned long)id;
1829        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1830                return req;
1831        if (!missing_ok) {
1832                drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1833                        (unsigned long)id, (unsigned long long)sector);
1834        }
1835        return NULL;
1836}
1837
1838static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1839{
1840        struct drbd_peer_device *peer_device;
1841        struct drbd_device *device;
1842        struct drbd_request *req;
1843        sector_t sector;
1844        int err;
1845        struct p_data *p = pi->data;
1846
1847        peer_device = conn_peer_device(connection, pi->vnr);
1848        if (!peer_device)
1849                return -EIO;
1850        device = peer_device->device;
1851
1852        sector = be64_to_cpu(p->sector);
1853
1854        spin_lock_irq(&device->resource->req_lock);
1855        req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1856        spin_unlock_irq(&device->resource->req_lock);
1857        if (unlikely(!req))
1858                return -EIO;
1859
1860        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1861         * special casing it there for the various failure cases.
1862         * still no race with drbd_fail_pending_reads */
1863        err = recv_dless_read(peer_device, req, sector, pi->size);
1864        if (!err)
1865                req_mod(req, DATA_RECEIVED);
1866        /* else: nothing. handled from drbd_disconnect...
1867         * I don't think we may complete this just yet
1868         * in case we are "on-disconnect: freeze" */
1869
1870        return err;
1871}
1872
1873static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1874{
1875        struct drbd_peer_device *peer_device;
1876        struct drbd_device *device;
1877        sector_t sector;
1878        int err;
1879        struct p_data *p = pi->data;
1880
1881        peer_device = conn_peer_device(connection, pi->vnr);
1882        if (!peer_device)
1883                return -EIO;
1884        device = peer_device->device;
1885
1886        sector = be64_to_cpu(p->sector);
1887        D_ASSERT(device, p->block_id == ID_SYNCER);
1888
1889        if (get_ldev(device)) {
1890                /* data is submitted to disk within recv_resync_read.
1891                 * corresponding put_ldev done below on error,
1892                 * or in drbd_peer_request_endio. */
1893                err = recv_resync_read(peer_device, sector, pi);
1894        } else {
1895                if (__ratelimit(&drbd_ratelimit_state))
1896                        drbd_err(device, "Can not write resync data to local disk.\n");
1897
1898                err = drbd_drain_block(peer_device, pi->size);
1899
1900                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1901        }
1902
1903        atomic_add(pi->size >> 9, &device->rs_sect_in);
1904
1905        return err;
1906}
1907
1908static void restart_conflicting_writes(struct drbd_device *device,
1909                                       sector_t sector, int size)
1910{
1911        struct drbd_interval *i;
1912        struct drbd_request *req;
1913
1914        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1915                if (!i->local)
1916                        continue;
1917                req = container_of(i, struct drbd_request, i);
1918                if (req->rq_state & RQ_LOCAL_PENDING ||
1919                    !(req->rq_state & RQ_POSTPONED))
1920                        continue;
1921                /* as it is RQ_POSTPONED, this will cause it to
1922                 * be queued on the retry workqueue. */
1923                __req_mod(req, CONFLICT_RESOLVED, NULL);
1924        }
1925}
1926
1927/*
1928 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1929 */
1930static int e_end_block(struct drbd_work *w, int cancel)
1931{
1932        struct drbd_peer_request *peer_req =
1933                container_of(w, struct drbd_peer_request, w);
1934        struct drbd_peer_device *peer_device = peer_req->peer_device;
1935        struct drbd_device *device = peer_device->device;
1936        sector_t sector = peer_req->i.sector;
1937        int err = 0, pcmd;
1938
1939        if (peer_req->flags & EE_SEND_WRITE_ACK) {
1940                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1941                        pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1942                                device->state.conn <= C_PAUSED_SYNC_T &&
1943                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1944                                P_RS_WRITE_ACK : P_WRITE_ACK;
1945                        err = drbd_send_ack(peer_device, pcmd, peer_req);
1946                        if (pcmd == P_RS_WRITE_ACK)
1947                                drbd_set_in_sync(device, sector, peer_req->i.size);
1948                } else {
1949                        err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1950                        /* we expect it to be marked out of sync anyways...
1951                         * maybe assert this?  */
1952                }
1953                dec_unacked(device);
1954        }
1955
1956        /* we delete from the conflict detection hash _after_ we sent out the
1957         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1958        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1959                spin_lock_irq(&device->resource->req_lock);
1960                D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1961                drbd_remove_epoch_entry_interval(device, peer_req);
1962                if (peer_req->flags & EE_RESTART_REQUESTS)
1963                        restart_conflicting_writes(device, sector, peer_req->i.size);
1964                spin_unlock_irq(&device->resource->req_lock);
1965        } else
1966                D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1967
1968        drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1969
1970        return err;
1971}
1972
1973static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1974{
1975        struct drbd_peer_request *peer_req =
1976                container_of(w, struct drbd_peer_request, w);
1977        struct drbd_peer_device *peer_device = peer_req->peer_device;
1978        int err;
1979
1980        err = drbd_send_ack(peer_device, ack, peer_req);
1981        dec_unacked(peer_device->device);
1982
1983        return err;
1984}
1985
1986static int e_send_superseded(struct drbd_work *w, int unused)
1987{
1988        return e_send_ack(w, P_SUPERSEDED);
1989}
1990
1991static int e_send_retry_write(struct drbd_work *w, int unused)
1992{
1993        struct drbd_peer_request *peer_req =
1994                container_of(w, struct drbd_peer_request, w);
1995        struct drbd_connection *connection = peer_req->peer_device->connection;
1996
1997        return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1998                             P_RETRY_WRITE : P_SUPERSEDED);
1999}
2000
2001static bool seq_greater(u32 a, u32 b)
2002{
2003        /*
2004         * We assume 32-bit wrap-around here.
2005         * For 24-bit wrap-around, we would have to shift:
2006         *  a <<= 8; b <<= 8;
2007         */
2008        return (s32)a - (s32)b > 0;
2009}
2010
2011static u32 seq_max(u32 a, u32 b)
2012{
2013        return seq_greater(a, b) ? a : b;
2014}
2015
2016static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2017{
2018        struct drbd_device *device = peer_device->device;
2019        unsigned int newest_peer_seq;
2020
2021        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2022                spin_lock(&device->peer_seq_lock);
2023                newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2024                device->peer_seq = newest_peer_seq;
2025                spin_unlock(&device->peer_seq_lock);
2026                /* wake up only if we actually changed device->peer_seq */
2027                if (peer_seq == newest_peer_seq)
2028                        wake_up(&device->seq_wait);
2029        }
2030}
2031
2032static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2033{
2034        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2035}
2036
2037/* maybe change sync_ee into interval trees as well? */
2038static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2039{
2040        struct drbd_peer_request *rs_req;
2041        bool rv = 0;
2042
2043        spin_lock_irq(&device->resource->req_lock);
2044        list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2045                if (overlaps(peer_req->i.sector, peer_req->i.size,
2046                             rs_req->i.sector, rs_req->i.size)) {
2047                        rv = 1;
2048                        break;
2049                }
2050        }
2051        spin_unlock_irq(&device->resource->req_lock);
2052
2053        return rv;
2054}
2055
2056/* Called from receive_Data.
2057 * Synchronize packets on sock with packets on msock.
2058 *
2059 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2060 * packet traveling on msock, they are still processed in the order they have
2061 * been sent.
2062 *
2063 * Note: we don't care for Ack packets overtaking P_DATA packets.
2064 *
2065 * In case packet_seq is larger than device->peer_seq number, there are
2066 * outstanding packets on the msock. We wait for them to arrive.
2067 * In case we are the logically next packet, we update device->peer_seq
2068 * ourselves. Correctly handles 32bit wrap around.
2069 *
2070 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2071 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2072 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2073 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2074 *
2075 * returns 0 if we may process the packet,
2076 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2077static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2078{
2079        struct drbd_device *device = peer_device->device;
2080        DEFINE_WAIT(wait);
2081        long timeout;
2082        int ret = 0, tp;
2083
2084        if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2085                return 0;
2086
2087        spin_lock(&device->peer_seq_lock);
2088        for (;;) {
2089                if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2090                        device->peer_seq = seq_max(device->peer_seq, peer_seq);
2091                        break;
2092                }
2093
2094                if (signal_pending(current)) {
2095                        ret = -ERESTARTSYS;
2096                        break;
2097                }
2098
2099                rcu_read_lock();
2100                tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2101                rcu_read_unlock();
2102
2103                if (!tp)
2104                        break;
2105
2106                /* Only need to wait if two_primaries is enabled */
2107                prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2108                spin_unlock(&device->peer_seq_lock);
2109                rcu_read_lock();
2110                timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2111                rcu_read_unlock();
2112                timeout = schedule_timeout(timeout);
2113                spin_lock(&device->peer_seq_lock);
2114                if (!timeout) {
2115                        ret = -ETIMEDOUT;
2116                        drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2117                        break;
2118                }
2119        }
2120        spin_unlock(&device->peer_seq_lock);
2121        finish_wait(&device->seq_wait, &wait);
2122        return ret;
2123}
2124
2125/* see also bio_flags_to_wire()
2126 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2127 * flags and back. We may replicate to other kernel versions. */
2128static unsigned long wire_flags_to_bio(u32 dpf)
2129{
2130        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2131                (dpf & DP_FUA ? REQ_FUA : 0) |
2132                (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2133                (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2134}
2135
2136static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2137                                    unsigned int size)
2138{
2139        struct drbd_interval *i;
2140
2141    repeat:
2142        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2143                struct drbd_request *req;
2144                struct bio_and_error m;
2145
2146                if (!i->local)
2147                        continue;
2148                req = container_of(i, struct drbd_request, i);
2149                if (!(req->rq_state & RQ_POSTPONED))
2150                        continue;
2151                req->rq_state &= ~RQ_POSTPONED;
2152                __req_mod(req, NEG_ACKED, &m);
2153                spin_unlock_irq(&device->resource->req_lock);
2154                if (m.bio)
2155                        complete_master_bio(device, &m);
2156                spin_lock_irq(&device->resource->req_lock);
2157                goto repeat;
2158        }
2159}
2160
2161static int handle_write_conflicts(struct drbd_device *device,
2162                                  struct drbd_peer_request *peer_req)
2163{
2164        struct drbd_connection *connection = peer_req->peer_device->connection;
2165        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2166        sector_t sector = peer_req->i.sector;
2167        const unsigned int size = peer_req->i.size;
2168        struct drbd_interval *i;
2169        bool equal;
2170        int err;
2171
2172        /*
2173         * Inserting the peer request into the write_requests tree will prevent
2174         * new conflicting local requests from being added.
2175         */
2176        drbd_insert_interval(&device->write_requests, &peer_req->i);
2177
2178    repeat:
2179        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2180                if (i == &peer_req->i)
2181                        continue;
2182                if (i->completed)
2183                        continue;
2184
2185                if (!i->local) {
2186                        /*
2187                         * Our peer has sent a conflicting remote request; this
2188                         * should not happen in a two-node setup.  Wait for the
2189                         * earlier peer request to complete.
2190                         */
2191                        err = drbd_wait_misc(device, i);
2192                        if (err)
2193                                goto out;
2194                        goto repeat;
2195                }
2196
2197                equal = i->sector == sector && i->size == size;
2198                if (resolve_conflicts) {
2199                        /*
2200                         * If the peer request is fully contained within the
2201                         * overlapping request, it can be considered overwritten
2202                         * and thus superseded; otherwise, it will be retried
2203                         * once all overlapping requests have completed.
2204                         */
2205                        bool superseded = i->sector <= sector && i->sector +
2206                                       (i->size >> 9) >= sector + (size >> 9);
2207
2208                        if (!equal)
2209                                drbd_alert(device, "Concurrent writes detected: "
2210                                               "local=%llus +%u, remote=%llus +%u, "
2211                                               "assuming %s came first\n",
2212                                          (unsigned long long)i->sector, i->size,
2213                                          (unsigned long long)sector, size,
2214                                          superseded ? "local" : "remote");
2215
2216                        peer_req->w.cb = superseded ? e_send_superseded :
2217                                                   e_send_retry_write;
2218                        list_add_tail(&peer_req->w.list, &device->done_ee);
2219                        wake_asender(connection);
2220
2221                        err = -ENOENT;
2222                        goto out;
2223                } else {
2224                        struct drbd_request *req =
2225                                container_of(i, struct drbd_request, i);
2226
2227                        if (!equal)
2228                                drbd_alert(device, "Concurrent writes detected: "
2229                                               "local=%llus +%u, remote=%llus +%u\n",
2230                                          (unsigned long long)i->sector, i->size,
2231                                          (unsigned long long)sector, size);
2232
2233                        if (req->rq_state & RQ_LOCAL_PENDING ||
2234                            !(req->rq_state & RQ_POSTPONED)) {
2235                                /*
2236                                 * Wait for the node with the discard flag to
2237                                 * decide if this request has been superseded
2238                                 * or needs to be retried.
2239                                 * Requests that have been superseded will
2240                                 * disappear from the write_requests tree.
2241                                 *
2242                                 * In addition, wait for the conflicting
2243                                 * request to finish locally before submitting
2244                                 * the conflicting peer request.
2245                                 */
2246                                err = drbd_wait_misc(device, &req->i);
2247                                if (err) {
2248                                        _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2249                                        fail_postponed_requests(device, sector, size);
2250                                        goto out;
2251                                }
2252                                goto repeat;
2253                        }
2254                        /*
2255                         * Remember to restart the conflicting requests after
2256                         * the new peer request has completed.
2257                         */
2258                        peer_req->flags |= EE_RESTART_REQUESTS;
2259                }
2260        }
2261        err = 0;
2262
2263    out:
2264        if (err)
2265                drbd_remove_epoch_entry_interval(device, peer_req);
2266        return err;
2267}
2268
2269/* mirrored write */
2270static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2271{
2272        struct drbd_peer_device *peer_device;
2273        struct drbd_device *device;
2274        struct net_conf *nc;
2275        sector_t sector;
2276        struct drbd_peer_request *peer_req;
2277        struct p_data *p = pi->data;
2278        u32 peer_seq = be32_to_cpu(p->seq_num);
2279        int rw = WRITE;
2280        u32 dp_flags;
2281        int err, tp;
2282
2283        peer_device = conn_peer_device(connection, pi->vnr);
2284        if (!peer_device)
2285                return -EIO;
2286        device = peer_device->device;
2287
2288        if (!get_ldev(device)) {
2289                int err2;
2290
2291                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2292                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2293                atomic_inc(&connection->current_epoch->epoch_size);
2294                err2 = drbd_drain_block(peer_device, pi->size);
2295                if (!err)
2296                        err = err2;
2297                return err;
2298        }
2299
2300        /*
2301         * Corresponding put_ldev done either below (on various errors), or in
2302         * drbd_peer_request_endio, if we successfully submit the data at the
2303         * end of this function.
2304         */
2305
2306        sector = be64_to_cpu(p->sector);
2307        peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2308        if (!peer_req) {
2309                put_ldev(device);
2310                return -EIO;
2311        }
2312
2313        peer_req->w.cb = e_end_block;
2314        peer_req->submit_jif = jiffies;
2315        peer_req->flags |= EE_APPLICATION;
2316
2317        dp_flags = be32_to_cpu(p->dp_flags);
2318        rw |= wire_flags_to_bio(dp_flags);
2319        if (pi->cmd == P_TRIM) {
2320                struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2321                peer_req->flags |= EE_IS_TRIM;
2322                if (!blk_queue_discard(q))
2323                        peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2324                D_ASSERT(peer_device, peer_req->i.size > 0);
2325                D_ASSERT(peer_device, rw & REQ_DISCARD);
2326                D_ASSERT(peer_device, peer_req->pages == NULL);
2327        } else if (peer_req->pages == NULL) {
2328                D_ASSERT(device, peer_req->i.size == 0);
2329                D_ASSERT(device, dp_flags & DP_FLUSH);
2330        }
2331
2332        if (dp_flags & DP_MAY_SET_IN_SYNC)
2333                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2334
2335        spin_lock(&connection->epoch_lock);
2336        peer_req->epoch = connection->current_epoch;
2337        atomic_inc(&peer_req->epoch->epoch_size);
2338        atomic_inc(&peer_req->epoch->active);
2339        spin_unlock(&connection->epoch_lock);
2340
2341        rcu_read_lock();
2342        nc = rcu_dereference(peer_device->connection->net_conf);
2343        tp = nc->two_primaries;
2344        if (peer_device->connection->agreed_pro_version < 100) {
2345                switch (nc->wire_protocol) {
2346                case DRBD_PROT_C:
2347                        dp_flags |= DP_SEND_WRITE_ACK;
2348                        break;
2349                case DRBD_PROT_B:
2350                        dp_flags |= DP_SEND_RECEIVE_ACK;
2351                        break;
2352                }
2353        }
2354        rcu_read_unlock();
2355
2356        if (dp_flags & DP_SEND_WRITE_ACK) {
2357                peer_req->flags |= EE_SEND_WRITE_ACK;
2358                inc_unacked(device);
2359                /* corresponding dec_unacked() in e_end_block()
2360                 * respective _drbd_clear_done_ee */
2361        }
2362
2363        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2364                /* I really don't like it that the receiver thread
2365                 * sends on the msock, but anyways */
2366                drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2367        }
2368
2369        if (tp) {
2370                /* two primaries implies protocol C */
2371                D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2372                peer_req->flags |= EE_IN_INTERVAL_TREE;
2373                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2374                if (err)
2375                        goto out_interrupted;
2376                spin_lock_irq(&device->resource->req_lock);
2377                err = handle_write_conflicts(device, peer_req);
2378                if (err) {
2379                        spin_unlock_irq(&device->resource->req_lock);
2380                        if (err == -ENOENT) {
2381                                put_ldev(device);
2382                                return 0;
2383                        }
2384                        goto out_interrupted;
2385                }
2386        } else {
2387                update_peer_seq(peer_device, peer_seq);
2388                spin_lock_irq(&device->resource->req_lock);
2389        }
2390        /* if we use the zeroout fallback code, we process synchronously
2391         * and we wait for all pending requests, respectively wait for
2392         * active_ee to become empty in drbd_submit_peer_request();
2393         * better not add ourselves here. */
2394        if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2395                list_add_tail(&peer_req->w.list, &device->active_ee);
2396        spin_unlock_irq(&device->resource->req_lock);
2397
2398        if (device->state.conn == C_SYNC_TARGET)
2399                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2400
2401        if (device->state.pdsk < D_INCONSISTENT) {
2402                /* In case we have the only disk of the cluster, */
2403                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2404                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2405                drbd_al_begin_io(device, &peer_req->i);
2406                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2407        }
2408
2409        err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2410        if (!err)
2411                return 0;
2412
2413        /* don't care for the reason here */
2414        drbd_err(device, "submit failed, triggering re-connect\n");
2415        spin_lock_irq(&device->resource->req_lock);
2416        list_del(&peer_req->w.list);
2417        drbd_remove_epoch_entry_interval(device, peer_req);
2418        spin_unlock_irq(&device->resource->req_lock);
2419        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2420                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2421                drbd_al_complete_io(device, &peer_req->i);
2422        }
2423
2424out_interrupted:
2425        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2426        put_ldev(device);
2427        drbd_free_peer_req(device, peer_req);
2428        return err;
2429}
2430
2431/* We may throttle resync, if the lower device seems to be busy,
2432 * and current sync rate is above c_min_rate.
2433 *
2434 * To decide whether or not the lower device is busy, we use a scheme similar
2435 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2436 * (more than 64 sectors) of activity we cannot account for with our own resync
2437 * activity, it obviously is "busy".
2438 *
2439 * The current sync rate used here uses only the most recent two step marks,
2440 * to have a short time average so we can react faster.
2441 */
2442bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2443                bool throttle_if_app_is_waiting)
2444{
2445        struct lc_element *tmp;
2446        bool throttle = drbd_rs_c_min_rate_throttle(device);
2447
2448        if (!throttle || throttle_if_app_is_waiting)
2449                return throttle;
2450
2451        spin_lock_irq(&device->al_lock);
2452        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2453        if (tmp) {
2454                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2455                if (test_bit(BME_PRIORITY, &bm_ext->flags))
2456                        throttle = false;
2457                /* Do not slow down if app IO is already waiting for this extent,
2458                 * and our progress is necessary for application IO to complete. */
2459        }
2460        spin_unlock_irq(&device->al_lock);
2461
2462        return throttle;
2463}
2464
2465bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2466{
2467        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2468        unsigned long db, dt, dbdt;
2469        unsigned int c_min_rate;
2470        int curr_events;
2471
2472        rcu_read_lock();
2473        c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2474        rcu_read_unlock();
2475
2476        /* feature disabled? */
2477        if (c_min_rate == 0)
2478                return false;
2479
2480        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2481                      (int)part_stat_read(&disk->part0, sectors[1]) -
2482                        atomic_read(&device->rs_sect_ev);
2483
2484        if (atomic_read(&device->ap_actlog_cnt)
2485            || curr_events - device->rs_last_events > 64) {
2486                unsigned long rs_left;
2487                int i;
2488
2489                device->rs_last_events = curr_events;
2490
2491                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2492                 * approx. */
2493                i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2494
2495                if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2496                        rs_left = device->ov_left;
2497                else
2498                        rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2499
2500                dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2501                if (!dt)
2502                        dt++;
2503                db = device->rs_mark_left[i] - rs_left;
2504                dbdt = Bit2KB(db/dt);
2505
2506                if (dbdt > c_min_rate)
2507                        return true;
2508        }
2509        return false;
2510}
2511
2512static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2513{
2514        struct drbd_peer_device *peer_device;
2515        struct drbd_device *device;
2516        sector_t sector;
2517        sector_t capacity;
2518        struct drbd_peer_request *peer_req;
2519        struct digest_info *di = NULL;
2520        int size, verb;
2521        unsigned int fault_type;
2522        struct p_block_req *p = pi->data;
2523
2524        peer_device = conn_peer_device(connection, pi->vnr);
2525        if (!peer_device)
2526                return -EIO;
2527        device = peer_device->device;
2528        capacity = drbd_get_capacity(device->this_bdev);
2529
2530        sector = be64_to_cpu(p->sector);
2531        size   = be32_to_cpu(p->blksize);
2532
2533        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2534                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2535                                (unsigned long long)sector, size);
2536                return -EINVAL;
2537        }
2538        if (sector + (size>>9) > capacity) {
2539                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2540                                (unsigned long long)sector, size);
2541                return -EINVAL;
2542        }
2543
2544        if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2545                verb = 1;
2546                switch (pi->cmd) {
2547                case P_DATA_REQUEST:
2548                        drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2549                        break;
2550                case P_RS_DATA_REQUEST:
2551                case P_CSUM_RS_REQUEST:
2552                case P_OV_REQUEST:
2553                        drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2554                        break;
2555                case P_OV_REPLY:
2556                        verb = 0;
2557                        dec_rs_pending(device);
2558                        drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2559                        break;
2560                default:
2561                        BUG();
2562                }
2563                if (verb && __ratelimit(&drbd_ratelimit_state))
2564                        drbd_err(device, "Can not satisfy peer's read request, "
2565                            "no local data.\n");
2566
2567                /* drain possibly payload */
2568                return drbd_drain_block(peer_device, pi->size);
2569        }
2570
2571        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2572         * "criss-cross" setup, that might cause write-out on some other DRBD,
2573         * which in turn might block on the other node at this very place.  */
2574        peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2575                        true /* has real payload */, GFP_NOIO);
2576        if (!peer_req) {
2577                put_ldev(device);
2578                return -ENOMEM;
2579        }
2580
2581        switch (pi->cmd) {
2582        case P_DATA_REQUEST:
2583                peer_req->w.cb = w_e_end_data_req;
2584                fault_type = DRBD_FAULT_DT_RD;
2585                /* application IO, don't drbd_rs_begin_io */
2586                peer_req->flags |= EE_APPLICATION;
2587                goto submit;
2588
2589        case P_RS_DATA_REQUEST:
2590                peer_req->w.cb = w_e_end_rsdata_req;
2591                fault_type = DRBD_FAULT_RS_RD;
2592                /* used in the sector offset progress display */
2593                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2594                break;
2595
2596        case P_OV_REPLY:
2597        case P_CSUM_RS_REQUEST:
2598                fault_type = DRBD_FAULT_RS_RD;
2599                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2600                if (!di)
2601                        goto out_free_e;
2602
2603                di->digest_size = pi->size;
2604                di->digest = (((char *)di)+sizeof(struct digest_info));
2605
2606                peer_req->digest = di;
2607                peer_req->flags |= EE_HAS_DIGEST;
2608
2609                if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2610                        goto out_free_e;
2611
2612                if (pi->cmd == P_CSUM_RS_REQUEST) {
2613                        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2614                        peer_req->w.cb = w_e_end_csum_rs_req;
2615                        /* used in the sector offset progress display */
2616                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2617                        /* remember to report stats in drbd_resync_finished */
2618                        device->use_csums = true;
2619                } else if (pi->cmd == P_OV_REPLY) {
2620                        /* track progress, we may need to throttle */
2621                        atomic_add(size >> 9, &device->rs_sect_in);
2622                        peer_req->w.cb = w_e_end_ov_reply;
2623                        dec_rs_pending(device);
2624                        /* drbd_rs_begin_io done when we sent this request,
2625                         * but accounting still needs to be done. */
2626                        goto submit_for_resync;
2627                }
2628                break;
2629
2630        case P_OV_REQUEST:
2631                if (device->ov_start_sector == ~(sector_t)0 &&
2632                    peer_device->connection->agreed_pro_version >= 90) {
2633                        unsigned long now = jiffies;
2634                        int i;
2635                        device->ov_start_sector = sector;
2636                        device->ov_position = sector;
2637                        device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2638                        device->rs_total = device->ov_left;
2639                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2640                                device->rs_mark_left[i] = device->ov_left;
2641                                device->rs_mark_time[i] = now;
2642                        }
2643                        drbd_info(device, "Online Verify start sector: %llu\n",
2644                                        (unsigned long long)sector);
2645                }
2646                peer_req->w.cb = w_e_end_ov_req;
2647                fault_type = DRBD_FAULT_RS_RD;
2648                break;
2649
2650        default:
2651                BUG();
2652        }
2653
2654        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2655         * wrt the receiver, but it is not as straightforward as it may seem.
2656         * Various places in the resync start and stop logic assume resync
2657         * requests are processed in order, requeuing this on the worker thread
2658         * introduces a bunch of new code for synchronization between threads.
2659         *
2660         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2661         * "forever", throttling after drbd_rs_begin_io will lock that extent
2662         * for application writes for the same time.  For now, just throttle
2663         * here, where the rest of the code expects the receiver to sleep for
2664         * a while, anyways.
2665         */
2666
2667        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2668         * this defers syncer requests for some time, before letting at least
2669         * on request through.  The resync controller on the receiving side
2670         * will adapt to the incoming rate accordingly.
2671         *
2672         * We cannot throttle here if remote is Primary/SyncTarget:
2673         * we would also throttle its application reads.
2674         * In that case, throttling is done on the SyncTarget only.
2675         */
2676
2677        /* Even though this may be a resync request, we do add to "read_ee";
2678         * "sync_ee" is only used for resync WRITEs.
2679         * Add to list early, so debugfs can find this request
2680         * even if we have to sleep below. */
2681        spin_lock_irq(&device->resource->req_lock);
2682        list_add_tail(&peer_req->w.list, &device->read_ee);
2683        spin_unlock_irq(&device->resource->req_lock);
2684
2685        update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2686        if (device->state.peer != R_PRIMARY
2687        && drbd_rs_should_slow_down(device, sector, false))
2688                schedule_timeout_uninterruptible(HZ/10);
2689        update_receiver_timing_details(connection, drbd_rs_begin_io);
2690        if (drbd_rs_begin_io(device, sector))
2691                goto out_free_e;
2692
2693submit_for_resync:
2694        atomic_add(size >> 9, &device->rs_sect_ev);
2695
2696submit:
2697        update_receiver_timing_details(connection, drbd_submit_peer_request);
2698        inc_unacked(device);
2699        if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2700                return 0;
2701
2702        /* don't care for the reason here */
2703        drbd_err(device, "submit failed, triggering re-connect\n");
2704
2705out_free_e:
2706        spin_lock_irq(&device->resource->req_lock);
2707        list_del(&peer_req->w.list);
2708        spin_unlock_irq(&device->resource->req_lock);
2709        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2710
2711        put_ldev(device);
2712        drbd_free_peer_req(device, peer_req);
2713        return -EIO;
2714}
2715
2716/**
2717 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2718 */
2719static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2720{
2721        struct drbd_device *device = peer_device->device;
2722        int self, peer, rv = -100;
2723        unsigned long ch_self, ch_peer;
2724        enum drbd_after_sb_p after_sb_0p;
2725
2726        self = device->ldev->md.uuid[UI_BITMAP] & 1;
2727        peer = device->p_uuid[UI_BITMAP] & 1;
2728
2729        ch_peer = device->p_uuid[UI_SIZE];
2730        ch_self = device->comm_bm_set;
2731
2732        rcu_read_lock();
2733        after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2734        rcu_read_unlock();
2735        switch (after_sb_0p) {
2736        case ASB_CONSENSUS:
2737        case ASB_DISCARD_SECONDARY:
2738        case ASB_CALL_HELPER:
2739        case ASB_VIOLENTLY:
2740                drbd_err(device, "Configuration error.\n");
2741                break;
2742        case ASB_DISCONNECT:
2743                break;
2744        case ASB_DISCARD_YOUNGER_PRI:
2745                if (self == 0 && peer == 1) {
2746                        rv = -1;
2747                        break;
2748                }
2749                if (self == 1 && peer == 0) {
2750                        rv =  1;
2751                        break;
2752                }
2753                /* Else fall through to one of the other strategies... */
2754        case ASB_DISCARD_OLDER_PRI:
2755                if (self == 0 && peer == 1) {
2756                        rv = 1;
2757                        break;
2758                }
2759                if (self == 1 && peer == 0) {
2760                        rv = -1;
2761                        break;
2762                }
2763                /* Else fall through to one of the other strategies... */
2764                drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2765                     "Using discard-least-changes instead\n");
2766        case ASB_DISCARD_ZERO_CHG:
2767                if (ch_peer == 0 && ch_self == 0) {
2768                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2769                                ? -1 : 1;
2770                        break;
2771                } else {
2772                        if (ch_peer == 0) { rv =  1; break; }
2773                        if (ch_self == 0) { rv = -1; break; }
2774                }
2775                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2776                        break;
2777        case ASB_DISCARD_LEAST_CHG:
2778                if      (ch_self < ch_peer)
2779                        rv = -1;
2780                else if (ch_self > ch_peer)
2781                        rv =  1;
2782                else /* ( ch_self == ch_peer ) */
2783                     /* Well, then use something else. */
2784                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2785                                ? -1 : 1;
2786                break;
2787        case ASB_DISCARD_LOCAL:
2788                rv = -1;
2789                break;
2790        case ASB_DISCARD_REMOTE:
2791                rv =  1;
2792        }
2793
2794        return rv;
2795}
2796
2797/**
2798 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2799 */
2800static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2801{
2802        struct drbd_device *device = peer_device->device;
2803        int hg, rv = -100;
2804        enum drbd_after_sb_p after_sb_1p;
2805
2806        rcu_read_lock();
2807        after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2808        rcu_read_unlock();
2809        switch (after_sb_1p) {
2810        case ASB_DISCARD_YOUNGER_PRI:
2811        case ASB_DISCARD_OLDER_PRI:
2812        case ASB_DISCARD_LEAST_CHG:
2813        case ASB_DISCARD_LOCAL:
2814        case ASB_DISCARD_REMOTE:
2815        case ASB_DISCARD_ZERO_CHG:
2816                drbd_err(device, "Configuration error.\n");
2817                break;
2818        case ASB_DISCONNECT:
2819                break;
2820        case ASB_CONSENSUS:
2821                hg = drbd_asb_recover_0p(peer_device);
2822                if (hg == -1 && device->state.role == R_SECONDARY)
2823                        rv = hg;
2824                if (hg == 1  && device->state.role == R_PRIMARY)
2825                        rv = hg;
2826                break;
2827        case ASB_VIOLENTLY:
2828                rv = drbd_asb_recover_0p(peer_device);
2829                break;
2830        case ASB_DISCARD_SECONDARY:
2831                return device->state.role == R_PRIMARY ? 1 : -1;
2832        case ASB_CALL_HELPER:
2833                hg = drbd_asb_recover_0p(peer_device);
2834                if (hg == -1 && device->state.role == R_PRIMARY) {
2835                        enum drbd_state_rv rv2;
2836
2837                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2838                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2839                          * we do not need to wait for the after state change work either. */
2840                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2841                        if (rv2 != SS_SUCCESS) {
2842                                drbd_khelper(device, "pri-lost-after-sb");
2843                        } else {
2844                                drbd_warn(device, "Successfully gave up primary role.\n");
2845                                rv = hg;
2846                        }
2847                } else
2848                        rv = hg;
2849        }
2850
2851        return rv;
2852}
2853
2854/**
2855 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2856 */
2857static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2858{
2859        struct drbd_device *device = peer_device->device;
2860        int hg, rv = -100;
2861        enum drbd_after_sb_p after_sb_2p;
2862
2863        rcu_read_lock();
2864        after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2865        rcu_read_unlock();
2866        switch (after_sb_2p) {
2867        case ASB_DISCARD_YOUNGER_PRI:
2868        case ASB_DISCARD_OLDER_PRI:
2869        case ASB_DISCARD_LEAST_CHG:
2870        case ASB_DISCARD_LOCAL:
2871        case ASB_DISCARD_REMOTE:
2872        case ASB_CONSENSUS:
2873        case ASB_DISCARD_SECONDARY:
2874        case ASB_DISCARD_ZERO_CHG:
2875                drbd_err(device, "Configuration error.\n");
2876                break;
2877        case ASB_VIOLENTLY:
2878                rv = drbd_asb_recover_0p(peer_device);
2879                break;
2880        case ASB_DISCONNECT:
2881                break;
2882        case ASB_CALL_HELPER:
2883                hg = drbd_asb_recover_0p(peer_device);
2884                if (hg == -1) {
2885                        enum drbd_state_rv rv2;
2886
2887                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2888                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2889                          * we do not need to wait for the after state change work either. */
2890                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2891                        if (rv2 != SS_SUCCESS) {
2892                                drbd_khelper(device, "pri-lost-after-sb");
2893                        } else {
2894                                drbd_warn(device, "Successfully gave up primary role.\n");
2895                                rv = hg;
2896                        }
2897                } else
2898                        rv = hg;
2899        }
2900
2901        return rv;
2902}
2903
2904static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2905                           u64 bits, u64 flags)
2906{
2907        if (!uuid) {
2908                drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2909                return;
2910        }
2911        drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2912             text,
2913             (unsigned long long)uuid[UI_CURRENT],
2914             (unsigned long long)uuid[UI_BITMAP],
2915             (unsigned long long)uuid[UI_HISTORY_START],
2916             (unsigned long long)uuid[UI_HISTORY_END],
2917             (unsigned long long)bits,
2918             (unsigned long long)flags);
2919}
2920
2921/*
2922  100   after split brain try auto recover
2923    2   C_SYNC_SOURCE set BitMap
2924    1   C_SYNC_SOURCE use BitMap
2925    0   no Sync
2926   -1   C_SYNC_TARGET use BitMap
2927   -2   C_SYNC_TARGET set BitMap
2928 -100   after split brain, disconnect
2929-1000   unrelated data
2930-1091   requires proto 91
2931-1096   requires proto 96
2932 */
2933static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2934{
2935        struct drbd_peer_device *const peer_device = first_peer_device(device);
2936        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2937        u64 self, peer;
2938        int i, j;
2939
2940        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2941        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2942
2943        *rule_nr = 10;
2944        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2945                return 0;
2946
2947        *rule_nr = 20;
2948        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2949             peer != UUID_JUST_CREATED)
2950                return -2;
2951
2952        *rule_nr = 30;
2953        if (self != UUID_JUST_CREATED &&
2954            (peer == UUID_JUST_CREATED || peer == (u64)0))
2955                return 2;
2956
2957        if (self == peer) {
2958                int rct, dc; /* roles at crash time */
2959
2960                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2961
2962                        if (connection->agreed_pro_version < 91)
2963                                return -1091;
2964
2965                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2966                            (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2967                                drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2968                                drbd_uuid_move_history(device);
2969                                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2970                                device->ldev->md.uuid[UI_BITMAP] = 0;
2971
2972                                drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2973                                               device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2974                                *rule_nr = 34;
2975                        } else {
2976                                drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2977                                *rule_nr = 36;
2978                        }
2979
2980                        return 1;
2981                }
2982
2983                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2984
2985                        if (connection->agreed_pro_version < 91)
2986                                return -1091;
2987
2988                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2989                            (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2990                                drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2991
2992                                device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2993                                device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2994                                device->p_uuid[UI_BITMAP] = 0UL;
2995
2996                                drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2997                                *rule_nr = 35;
2998                        } else {
2999                                drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3000                                *rule_nr = 37;
3001                        }
3002
3003                        return -1;
3004                }
3005
3006                /* Common power [off|failure] */
3007                rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3008                        (device->p_uuid[UI_FLAGS] & 2);
3009                /* lowest bit is set when we were primary,
3010                 * next bit (weight 2) is set when peer was primary */
3011                *rule_nr = 40;
3012
3013                switch (rct) {
3014                case 0: /* !self_pri && !peer_pri */ return 0;
3015                case 1: /*  self_pri && !peer_pri */ return 1;
3016                case 2: /* !self_pri &&  peer_pri */ return -1;
3017                case 3: /*  self_pri &&  peer_pri */
3018                        dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3019                        return dc ? -1 : 1;
3020                }
3021        }
3022
3023        *rule_nr = 50;
3024        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3025        if (self == peer)
3026                return -1;
3027
3028        *rule_nr = 51;
3029        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3030        if (self == peer) {
3031                if (connection->agreed_pro_version < 96 ?
3032                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3033                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3034                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3035                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3036                           resync as sync source modifications of the peer's UUIDs. */
3037
3038                        if (connection->agreed_pro_version < 91)
3039                                return -1091;
3040
3041                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3042                        device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3043
3044                        drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3045                        drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3046
3047                        return -1;
3048                }
3049        }
3050
3051        *rule_nr = 60;
3052        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3053        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3054                peer = device->p_uuid[i] & ~((u64)1);
3055                if (self == peer)
3056                        return -2;
3057        }
3058
3059        *rule_nr = 70;
3060        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3061        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3062        if (self == peer)
3063                return 1;
3064
3065        *rule_nr = 71;
3066        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3067        if (self == peer) {
3068                if (connection->agreed_pro_version < 96 ?
3069                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3070                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3071                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3072                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3073                           resync as sync source modifications of our UUIDs. */
3074
3075                        if (connection->agreed_pro_version < 91)
3076                                return -1091;
3077
3078                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3079                        __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3080
3081                        drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3082                        drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3083                                       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3084
3085                        return 1;
3086                }
3087        }
3088
3089
3090        *rule_nr = 80;
3091        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3092        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093                self = device->ldev->md.uuid[i] & ~((u64)1);
3094                if (self == peer)
3095                        return 2;
3096        }
3097
3098        *rule_nr = 90;
3099        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3101        if (self == peer && self != ((u64)0))
3102                return 100;
3103
3104        *rule_nr = 100;
3105        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3106                self = device->ldev->md.uuid[i] & ~((u64)1);
3107                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3108                        peer = device->p_uuid[j] & ~((u64)1);
3109                        if (self == peer)
3110                                return -100;
3111                }
3112        }
3113
3114        return -1000;
3115}
3116
3117/* drbd_sync_handshake() returns the new conn state on success, or
3118   CONN_MASK (-1) on failure.
3119 */
3120static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3121                                           enum drbd_role peer_role,
3122                                           enum drbd_disk_state peer_disk) __must_hold(local)
3123{
3124        struct drbd_device *device = peer_device->device;
3125        enum drbd_conns rv = C_MASK;
3126        enum drbd_disk_state mydisk;
3127        struct net_conf *nc;
3128        int hg, rule_nr, rr_conflict, tentative;
3129
3130        mydisk = device->state.disk;
3131        if (mydisk == D_NEGOTIATING)
3132                mydisk = device->new_state_tmp.disk;
3133
3134        drbd_info(device, "drbd_sync_handshake:\n");
3135
3136        spin_lock_irq(&device->ldev->md.uuid_lock);
3137        drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3138        drbd_uuid_dump(device, "peer", device->p_uuid,
3139                       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3140
3141        hg = drbd_uuid_compare(device, &rule_nr);
3142        spin_unlock_irq(&device->ldev->md.uuid_lock);
3143
3144        drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3145
3146        if (hg == -1000) {
3147                drbd_alert(device, "Unrelated data, aborting!\n");
3148                return C_MASK;
3149        }
3150        if (hg < -1000) {
3151                drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3152                return C_MASK;
3153        }
3154
3155        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3156            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3157                int f = (hg == -100) || abs(hg) == 2;
3158                hg = mydisk > D_INCONSISTENT ? 1 : -1;
3159                if (f)
3160                        hg = hg*2;
3161                drbd_info(device, "Becoming sync %s due to disk states.\n",
3162                     hg > 0 ? "source" : "target");
3163        }
3164
3165        if (abs(hg) == 100)
3166                drbd_khelper(device, "initial-split-brain");
3167
3168        rcu_read_lock();
3169        nc = rcu_dereference(peer_device->connection->net_conf);
3170
3171        if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3172                int pcount = (device->state.role == R_PRIMARY)
3173                           + (peer_role == R_PRIMARY);
3174                int forced = (hg == -100);
3175
3176                switch (pcount) {
3177                case 0:
3178                        hg = drbd_asb_recover_0p(peer_device);
3179                        break;
3180                case 1:
3181                        hg = drbd_asb_recover_1p(peer_device);
3182                        break;
3183                case 2:
3184                        hg = drbd_asb_recover_2p(peer_device);
3185                        break;
3186                }
3187                if (abs(hg) < 100) {
3188                        drbd_warn(device, "Split-Brain detected, %d primaries, "
3189                             "automatically solved. Sync from %s node\n",
3190                             pcount, (hg < 0) ? "peer" : "this");
3191                        if (forced) {
3192                                drbd_warn(device, "Doing a full sync, since"
3193                                     " UUIDs where ambiguous.\n");
3194                                hg = hg*2;
3195                        }
3196                }
3197        }
3198
3199        if (hg == -100) {
3200                if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3201                        hg = -1;
3202                if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3203                        hg = 1;
3204
3205                if (abs(hg) < 100)
3206                        drbd_warn(device, "Split-Brain detected, manually solved. "
3207                             "Sync from %s node\n",
3208                             (hg < 0) ? "peer" : "this");
3209        }
3210        rr_conflict = nc->rr_conflict;
3211        tentative = nc->tentative;
3212        rcu_read_unlock();
3213
3214        if (hg == -100) {
3215                /* FIXME this log message is not correct if we end up here
3216                 * after an attempted attach on a diskless node.
3217                 * We just refuse to attach -- well, we drop the "connection"
3218                 * to that disk, in a way... */
3219                drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3220                drbd_khelper(device, "split-brain");
3221                return C_MASK;
3222        }
3223
3224        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3225                drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3226                return C_MASK;
3227        }
3228
3229        if (hg < 0 && /* by intention we do not use mydisk here. */
3230            device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3231                switch (rr_conflict) {
3232                case ASB_CALL_HELPER:
3233                        drbd_khelper(device, "pri-lost");
3234                        /* fall through */
3235                case ASB_DISCONNECT:
3236                        drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3237                        return C_MASK;
3238                case ASB_VIOLENTLY:
3239                        drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3240                             "assumption\n");
3241                }
3242        }
3243
3244        if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3245                if (hg == 0)
3246                        drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3247                else
3248                        drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3249                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3250                                 abs(hg) >= 2 ? "full" : "bit-map based");
3251                return C_MASK;
3252        }
3253
3254        if (abs(hg) >= 2) {
3255                drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3256                if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3257                                        BM_LOCKED_SET_ALLOWED))
3258                        return C_MASK;
3259        }
3260
3261        if (hg > 0) { /* become sync source. */
3262                rv = C_WF_BITMAP_S;
3263        } else if (hg < 0) { /* become sync target */
3264                rv = C_WF_BITMAP_T;
3265        } else {
3266                rv = C_CONNECTED;
3267                if (drbd_bm_total_weight(device)) {
3268                        drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3269                             drbd_bm_total_weight(device));
3270                }
3271        }
3272
3273        return rv;
3274}
3275
3276static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3277{
3278        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3279        if (peer == ASB_DISCARD_REMOTE)
3280                return ASB_DISCARD_LOCAL;
3281
3282        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3283        if (peer == ASB_DISCARD_LOCAL)
3284                return ASB_DISCARD_REMOTE;
3285
3286        /* everything else is valid if they are equal on both sides. */
3287        return peer;
3288}
3289
3290static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3291{
3292        struct p_protocol *p = pi->data;
3293        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3294        int p_proto, p_discard_my_data, p_two_primaries, cf;
3295        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3296        char integrity_alg[SHARED_SECRET_MAX] = "";
3297        struct crypto_hash *peer_integrity_tfm = NULL;
3298        void *int_dig_in = NULL, *int_dig_vv = NULL;
3299
3300        p_proto         = be32_to_cpu(p->protocol);
3301        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3302        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3303        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3304        p_two_primaries = be32_to_cpu(p->two_primaries);
3305        cf              = be32_to_cpu(p->conn_flags);
3306        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3307
3308        if (connection->agreed_pro_version >= 87) {
3309                int err;
3310
3311                if (pi->size > sizeof(integrity_alg))
3312                        return -EIO;
3313                err = drbd_recv_all(connection, integrity_alg, pi->size);
3314                if (err)
3315                        return err;
3316                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3317        }
3318
3319        if (pi->cmd != P_PROTOCOL_UPDATE) {
3320                clear_bit(CONN_DRY_RUN, &connection->flags);
3321
3322                if (cf & CF_DRY_RUN)
3323                        set_bit(CONN_DRY_RUN, &connection->flags);
3324
3325                rcu_read_lock();
3326                nc = rcu_dereference(connection->net_conf);
3327
3328                if (p_proto != nc->wire_protocol) {
3329                        drbd_err(connection, "incompatible %s settings\n", "protocol");
3330                        goto disconnect_rcu_unlock;
3331                }
3332
3333                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3334                        drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3335                        goto disconnect_rcu_unlock;
3336                }
3337
3338                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3339                        drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3340                        goto disconnect_rcu_unlock;
3341                }
3342
3343                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3344                        drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3345                        goto disconnect_rcu_unlock;
3346                }
3347
3348                if (p_discard_my_data && nc->discard_my_data) {
3349                        drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3350                        goto disconnect_rcu_unlock;
3351                }
3352
3353                if (p_two_primaries != nc->two_primaries) {
3354                        drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3355                        goto disconnect_rcu_unlock;
3356                }
3357
3358                if (strcmp(integrity_alg, nc->integrity_alg)) {
3359                        drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3360                        goto disconnect_rcu_unlock;
3361                }
3362
3363                rcu_read_unlock();
3364        }
3365
3366        if (integrity_alg[0]) {
3367                int hash_size;
3368
3369                /*
3370                 * We can only change the peer data integrity algorithm
3371                 * here.  Changing our own data integrity algorithm
3372                 * requires that we send a P_PROTOCOL_UPDATE packet at
3373                 * the same time; otherwise, the peer has no way to
3374                 * tell between which packets the algorithm should
3375                 * change.
3376                 */
3377
3378                peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3379                if (!peer_integrity_tfm) {
3380                        drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3381                                 integrity_alg);
3382                        goto disconnect;
3383                }
3384
3385                hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3386                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3387                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3388                if (!(int_dig_in && int_dig_vv)) {
3389                        drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3390                        goto disconnect;
3391                }
3392        }
3393
3394        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3395        if (!new_net_conf) {
3396                drbd_err(connection, "Allocation of new net_conf failed\n");
3397                goto disconnect;
3398        }
3399
3400        mutex_lock(&connection->data.mutex);
3401        mutex_lock(&connection->resource->conf_update);
3402        old_net_conf = connection->net_conf;
3403        *new_net_conf = *old_net_conf;
3404
3405        new_net_conf->wire_protocol = p_proto;
3406        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3407        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3408        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3409        new_net_conf->two_primaries = p_two_primaries;
3410
3411        rcu_assign_pointer(connection->net_conf, new_net_conf);
3412        mutex_unlock(&connection->resource->conf_update);
3413        mutex_unlock(&connection->data.mutex);
3414
3415        crypto_free_hash(connection->peer_integrity_tfm);
3416        kfree(connection->int_dig_in);
3417        kfree(connection->int_dig_vv);
3418        connection->peer_integrity_tfm = peer_integrity_tfm;
3419        connection->int_dig_in = int_dig_in;
3420        connection->int_dig_vv = int_dig_vv;
3421
3422        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3423                drbd_info(connection, "peer data-integrity-alg: %s\n",
3424                          integrity_alg[0] ? integrity_alg : "(none)");
3425
3426        synchronize_rcu();
3427        kfree(old_net_conf);
3428        return 0;
3429
3430disconnect_rcu_unlock:
3431        rcu_read_unlock();
3432disconnect:
3433        crypto_free_hash(peer_integrity_tfm);
3434        kfree(int_dig_in);
3435        kfree(int_dig_vv);
3436        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3437        return -EIO;
3438}
3439
3440/* helper function
3441 * input: alg name, feature name
3442 * return: NULL (alg name was "")
3443 *         ERR_PTR(error) if something goes wrong
3444 *         or the crypto hash ptr, if it worked out ok. */
3445static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3446                const char *alg, const char *name)
3447{
3448        struct crypto_hash *tfm;
3449
3450        if (!alg[0])
3451                return NULL;
3452
3453        tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3454        if (IS_ERR(tfm)) {
3455                drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3456                        alg, name, PTR_ERR(tfm));
3457                return tfm;
3458        }
3459        return tfm;
3460}
3461
3462static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3463{
3464        void *buffer = connection->data.rbuf;
3465        int size = pi->size;
3466
3467        while (size) {
3468                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3469                s = drbd_recv(connection, buffer, s);
3470                if (s <= 0) {
3471                        if (s < 0)
3472                                return s;
3473                        break;
3474                }
3475                size -= s;
3476        }
3477        if (size)
3478                return -EIO;
3479        return 0;
3480}
3481
3482/*
3483 * config_unknown_volume  -  device configuration command for unknown volume
3484 *
3485 * When a device is added to an existing connection, the node on which the
3486 * device is added first will send configuration commands to its peer but the
3487 * peer will not know about the device yet.  It will warn and ignore these
3488 * commands.  Once the device is added on the second node, the second node will
3489 * send the same device configuration commands, but in the other direction.
3490 *
3491 * (We can also end up here if drbd is misconfigured.)
3492 */
3493static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3494{
3495        drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3496                  cmdname(pi->cmd), pi->vnr);
3497        return ignore_remaining_packet(connection, pi);
3498}
3499
3500static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3501{
3502        struct drbd_peer_device *peer_device;
3503        struct drbd_device *device;
3504        struct p_rs_param_95 *p;
3505        unsigned int header_size, data_size, exp_max_sz;
3506        struct crypto_hash *verify_tfm = NULL;
3507        struct crypto_hash *csums_tfm = NULL;
3508        struct net_conf *old_net_conf, *new_net_conf = NULL;
3509        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3510        const int apv = connection->agreed_pro_version;
3511        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3512        int fifo_size = 0;
3513        int err;
3514
3515        peer_device = conn_peer_device(connection, pi->vnr);
3516        if (!peer_device)
3517                return config_unknown_volume(connection, pi);
3518        device = peer_device->device;
3519
3520        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3521                    : apv == 88 ? sizeof(struct p_rs_param)
3522                                        + SHARED_SECRET_MAX
3523                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3524                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3525
3526        if (pi->size > exp_max_sz) {
3527                drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3528                    pi->size, exp_max_sz);
3529                return -EIO;
3530        }
3531
3532        if (apv <= 88) {
3533                header_size = sizeof(struct p_rs_param);
3534                data_size = pi->size - header_size;
3535        } else if (apv <= 94) {
3536                header_size = sizeof(struct p_rs_param_89);
3537                data_size = pi->size - header_size;
3538                D_ASSERT(device, data_size == 0);
3539        } else {
3540                header_size = sizeof(struct p_rs_param_95);
3541                data_size = pi->size - header_size;
3542                D_ASSERT(device, data_size == 0);
3543        }
3544
3545        /* initialize verify_alg and csums_alg */
3546        p = pi->data;
3547        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3548
3549        err = drbd_recv_all(peer_device->connection, p, header_size);
3550        if (err)
3551                return err;
3552
3553        mutex_lock(&connection->resource->conf_update);
3554        old_net_conf = peer_device->connection->net_conf;
3555        if (get_ldev(device)) {
3556                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3557                if (!new_disk_conf) {
3558                        put_ldev(device);
3559                        mutex_unlock(&connection->resource->conf_update);
3560                        drbd_err(device, "Allocation of new disk_conf failed\n");
3561                        return -ENOMEM;
3562                }
3563
3564                old_disk_conf = device->ldev->disk_conf;
3565                *new_disk_conf = *old_disk_conf;
3566
3567                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3568        }
3569
3570        if (apv >= 88) {
3571                if (apv == 88) {
3572                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3573                                drbd_err(device, "verify-alg of wrong size, "
3574                                        "peer wants %u, accepting only up to %u byte\n",
3575                                        data_size, SHARED_SECRET_MAX);
3576                                err = -EIO;
3577                                goto reconnect;
3578                        }
3579
3580                        err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3581                        if (err)
3582                                goto reconnect;
3583                        /* we expect NUL terminated string */
3584                        /* but just in case someone tries to be evil */
3585                        D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3586                        p->verify_alg[data_size-1] = 0;
3587
3588                } else /* apv >= 89 */ {
3589                        /* we still expect NUL terminated strings */
3590                        /* but just in case someone tries to be evil */
3591                        D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3592                        D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3593                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3594                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3595                }
3596
3597                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3598                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3599                                drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3600                                    old_net_conf->verify_alg, p->verify_alg);
3601                                goto disconnect;
3602                        }
3603                        verify_tfm = drbd_crypto_alloc_digest_safe(device,
3604                                        p->verify_alg, "verify-alg");
3605                        if (IS_ERR(verify_tfm)) {
3606                                verify_tfm = NULL;
3607                                goto disconnect;
3608                        }
3609                }
3610
3611                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3612                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3613                                drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3614                                    old_net_conf->csums_alg, p->csums_alg);
3615                                goto disconnect;
3616                        }
3617                        csums_tfm = drbd_crypto_alloc_digest_safe(device,
3618                                        p->csums_alg, "csums-alg");
3619                        if (IS_ERR(csums_tfm)) {
3620                                csums_tfm = NULL;
3621                                goto disconnect;
3622                        }
3623                }
3624
3625                if (apv > 94 && new_disk_conf) {
3626                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3627                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3628                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3629                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3630
3631                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3632                        if (fifo_size != device->rs_plan_s->size) {
3633                                new_plan = fifo_alloc(fifo_size);
3634                                if (!new_plan) {
3635                                        drbd_err(device, "kmalloc of fifo_buffer failed");
3636                                        put_ldev(device);
3637                                        goto disconnect;
3638                                }
3639                        }
3640                }
3641
3642                if (verify_tfm || csums_tfm) {
3643                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3644                        if (!new_net_conf) {
3645                                drbd_err(device, "Allocation of new net_conf failed\n");
3646                                goto disconnect;
3647                        }
3648
3649                        *new_net_conf = *old_net_conf;
3650
3651                        if (verify_tfm) {
3652                                strcpy(new_net_conf->verify_alg, p->verify_alg);
3653                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3654                                crypto_free_hash(peer_device->connection->verify_tfm);
3655                                peer_device->connection->verify_tfm = verify_tfm;
3656                                drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3657                        }
3658                        if (csums_tfm) {
3659                                strcpy(new_net_conf->csums_alg, p->csums_alg);
3660                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3661                                crypto_free_hash(peer_device->connection->csums_tfm);
3662                                peer_device->connection->csums_tfm = csums_tfm;
3663                                drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3664                        }
3665                        rcu_assign_pointer(connection->net_conf, new_net_conf);
3666                }
3667        }
3668
3669        if (new_disk_conf) {
3670                rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3671                put_ldev(device);
3672        }
3673
3674        if (new_plan) {
3675                old_plan = device->rs_plan_s;
3676                rcu_assign_pointer(device->rs_plan_s, new_plan);
3677        }
3678
3679        mutex_unlock(&connection->resource->conf_update);
3680        synchronize_rcu();
3681        if (new_net_conf)
3682                kfree(old_net_conf);
3683        kfree(old_disk_conf);
3684        kfree(old_plan);
3685
3686        return 0;
3687
3688reconnect:
3689        if (new_disk_conf) {
3690                put_ldev(device);
3691                kfree(new_disk_conf);
3692        }
3693        mutex_unlock(&connection->resource->conf_update);
3694        return -EIO;
3695
3696disconnect:
3697        kfree(new_plan);
3698        if (new_disk_conf) {
3699                put_ldev(device);
3700                kfree(new_disk_conf);
3701        }
3702        mutex_unlock(&connection->resource->conf_update);
3703        /* just for completeness: actually not needed,
3704         * as this is not reached if csums_tfm was ok. */
3705        crypto_free_hash(csums_tfm);
3706        /* but free the verify_tfm again, if csums_tfm did not work out */
3707        crypto_free_hash(verify_tfm);
3708        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3709        return -EIO;
3710}
3711
3712/* warn if the arguments differ by more than 12.5% */
3713static void warn_if_differ_considerably(struct drbd_device *device,
3714        const char *s, sector_t a, sector_t b)
3715{
3716        sector_t d;
3717        if (a == 0 || b == 0)
3718                return;
3719        d = (a > b) ? (a - b) : (b - a);
3720        if (d > (a>>3) || d > (b>>3))
3721                drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3722                     (unsigned long long)a, (unsigned long long)b);
3723}
3724
3725static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3726{
3727        struct drbd_peer_device *peer_device;
3728        struct drbd_device *device;
3729        struct p_sizes *p = pi->data;
3730        enum determine_dev_size dd = DS_UNCHANGED;
3731        sector_t p_size, p_usize, p_csize, my_usize;
3732        int ldsc = 0; /* local disk size changed */
3733        enum dds_flags ddsf;
3734
3735        peer_device = conn_peer_device(connection, pi->vnr);
3736        if (!peer_device)
3737                return config_unknown_volume(connection, pi);
3738        device = peer_device->device;
3739
3740        p_size = be64_to_cpu(p->d_size);
3741        p_usize = be64_to_cpu(p->u_size);
3742        p_csize = be64_to_cpu(p->c_size);
3743
3744        /* just store the peer's disk size for now.
3745         * we still need to figure out whether we accept that. */
3746        device->p_size = p_size;
3747
3748        if (get_ldev(device)) {
3749                rcu_read_lock();
3750                my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3751                rcu_read_unlock();
3752
3753                warn_if_differ_considerably(device, "lower level device sizes",
3754                           p_size, drbd_get_max_capacity(device->ldev));
3755                warn_if_differ_considerably(device, "user requested size",
3756                                            p_usize, my_usize);
3757
3758                /* if this is the first connect, or an otherwise expected
3759                 * param exchange, choose the minimum */
3760                if (device->state.conn == C_WF_REPORT_PARAMS)
3761                        p_usize = min_not_zero(my_usize, p_usize);
3762
3763                /* Never shrink a device with usable data during connect.
3764                   But allow online shrinking if we are connected. */
3765                if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3766                    drbd_get_capacity(device->this_bdev) &&
3767                    device->state.disk >= D_OUTDATED &&
3768                    device->state.conn < C_CONNECTED) {
3769                        drbd_err(device, "The peer's disk size is too small!\n");
3770                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3771                        put_ldev(device);
3772                        return -EIO;
3773                }
3774
3775                if (my_usize != p_usize) {
3776                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3777
3778                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3779                        if (!new_disk_conf) {
3780                                drbd_err(device, "Allocation of new disk_conf failed\n");
3781                                put_ldev(device);
3782                                return -ENOMEM;
3783                        }
3784
3785                        mutex_lock(&connection->resource->conf_update);
3786                        old_disk_conf = device->ldev->disk_conf;
3787                        *new_disk_conf = *old_disk_conf;
3788                        new_disk_conf->disk_size = p_usize;
3789
3790                        rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3791                        mutex_unlock(&connection->resource->conf_update);
3792                        synchronize_rcu();
3793                        kfree(old_disk_conf);
3794
3795                        drbd_info(device, "Peer sets u_size to %lu sectors\n",
3796                                 (unsigned long)my_usize);
3797                }
3798
3799                put_ldev(device);
3800        }
3801
3802        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3803        /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3804           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3805           drbd_reconsider_max_bio_size(), we can be sure that after
3806           drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3807
3808        ddsf = be16_to_cpu(p->dds_flags);
3809        if (get_ldev(device)) {
3810                drbd_reconsider_max_bio_size(device, device->ldev);
3811                dd = drbd_determine_dev_size(device, ddsf, NULL);
3812                put_ldev(device);
3813                if (dd == DS_ERROR)
3814                        return -EIO;
3815                drbd_md_sync(device);
3816        } else {
3817                /*
3818                 * I am diskless, need to accept the peer's *current* size.
3819                 * I must NOT accept the peers backing disk size,
3820                 * it may have been larger than mine all along...
3821                 *
3822                 * At this point, the peer knows more about my disk, or at
3823                 * least about what we last agreed upon, than myself.
3824                 * So if his c_size is less than his d_size, the most likely
3825                 * reason is that *my* d_size was smaller last time we checked.
3826                 *
3827                 * However, if he sends a zero current size,
3828                 * take his (user-capped or) backing disk size anyways.
3829                 */
3830                drbd_reconsider_max_bio_size(device, NULL);
3831                drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3832        }
3833
3834        if (get_ldev(device)) {
3835                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3836                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3837                        ldsc = 1;
3838                }
3839
3840                put_ldev(device);
3841        }
3842
3843        if (device->state.conn > C_WF_REPORT_PARAMS) {
3844                if (be64_to_cpu(p->c_size) !=
3845                    drbd_get_capacity(device->this_bdev) || ldsc) {
3846                        /* we have different sizes, probably peer
3847                         * needs to know my new size... */
3848                        drbd_send_sizes(peer_device, 0, ddsf);
3849                }
3850                if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3851                    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3852                        if (device->state.pdsk >= D_INCONSISTENT &&
3853                            device->state.disk >= D_INCONSISTENT) {
3854                                if (ddsf & DDSF_NO_RESYNC)
3855                                        drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3856                                else
3857                                        resync_after_online_grow(device);
3858                        } else
3859                                set_bit(RESYNC_AFTER_NEG, &device->flags);
3860                }
3861        }
3862
3863        return 0;
3864}
3865
3866static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3867{
3868        struct drbd_peer_device *peer_device;
3869        struct drbd_device *device;
3870        struct p_uuids *p = pi->data;
3871        u64 *p_uuid;
3872        int i, updated_uuids = 0;
3873
3874        peer_device = conn_peer_device(connection, pi->vnr);
3875        if (!peer_device)
3876                return config_unknown_volume(connection, pi);
3877        device = peer_device->device;
3878
3879        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3880        if (!p_uuid) {
3881                drbd_err(device, "kmalloc of p_uuid failed\n");
3882                return false;
3883        }
3884
3885        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3886                p_uuid[i] = be64_to_cpu(p->uuid[i]);
3887
3888        kfree(device->p_uuid);
3889        device->p_uuid = p_uuid;
3890
3891        if (device->state.conn < C_CONNECTED &&
3892            device->state.disk < D_INCONSISTENT &&
3893            device->state.role == R_PRIMARY &&
3894            (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3895                drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3896                    (unsigned long long)device->ed_uuid);
3897                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3898                return -EIO;
3899        }
3900
3901        if (get_ldev(device)) {
3902                int skip_initial_sync =
3903                        device->state.conn == C_CONNECTED &&
3904                        peer_device->connection->agreed_pro_version >= 90 &&
3905                        device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3906                        (p_uuid[UI_FLAGS] & 8);
3907                if (skip_initial_sync) {
3908                        drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3909                        drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3910                                        "clear_n_write from receive_uuids",
3911                                        BM_LOCKED_TEST_ALLOWED);
3912                        _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3913                        _drbd_uuid_set(device, UI_BITMAP, 0);
3914                        _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3915                                        CS_VERBOSE, NULL);
3916                        drbd_md_sync(device);
3917                        updated_uuids = 1;
3918                }
3919                put_ldev(device);
3920        } else if (device->state.disk < D_INCONSISTENT &&
3921                   device->state.role == R_PRIMARY) {
3922                /* I am a diskless primary, the peer just created a new current UUID
3923                   for me. */
3924                updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3925        }
3926
3927        /* Before we test for the disk state, we should wait until an eventually
3928           ongoing cluster wide state change is finished. That is important if
3929           we are primary and are detaching from our disk. We need to see the
3930           new disk state... */
3931        mutex_lock(device->state_mutex);
3932        mutex_unlock(device->state_mutex);
3933        if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3934                updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3935
3936        if (updated_uuids)
3937                drbd_print_uuids(device, "receiver updated UUIDs to");
3938
3939        return 0;
3940}
3941
3942/**
3943 * convert_state() - Converts the peer's view of the cluster state to our point of view
3944 * @ps:         The state as seen by the peer.
3945 */
3946static union drbd_state convert_state(union drbd_state ps)
3947{
3948        union drbd_state ms;
3949
3950        static enum drbd_conns c_tab[] = {
3951                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3952                [C_CONNECTED] = C_CONNECTED,
3953
3954                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3955                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3956                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3957                [C_VERIFY_S]       = C_VERIFY_T,
3958                [C_MASK]   = C_MASK,
3959        };
3960
3961        ms.i = ps.i;
3962
3963        ms.conn = c_tab[ps.conn];
3964        ms.peer = ps.role;
3965        ms.role = ps.peer;
3966        ms.pdsk = ps.disk;
3967        ms.disk = ps.pdsk;
3968        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3969
3970        return ms;
3971}
3972
3973static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3974{
3975        struct drbd_peer_device *peer_device;
3976        struct drbd_device *device;
3977        struct p_req_state *p = pi->data;
3978        union drbd_state mask, val;
3979        enum drbd_state_rv rv;
3980
3981        peer_device = conn_peer_device(connection, pi->vnr);
3982        if (!peer_device)
3983                return -EIO;
3984        device = peer_device->device;
3985
3986        mask.i = be32_to_cpu(p->mask);
3987        val.i = be32_to_cpu(p->val);
3988
3989        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3990            mutex_is_locked(device->state_mutex)) {
3991                drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3992                return 0;
3993        }
3994
3995        mask = convert_state(mask);
3996        val = convert_state(val);
3997
3998        rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3999        drbd_send_sr_reply(peer_device, rv);
4000
4001        drbd_md_sync(device);
4002
4003        return 0;
4004}
4005
4006static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4007{
4008        struct p_req_state *p = pi->data;
4009        union drbd_state mask, val;
4010        enum drbd_state_rv rv;
4011
4012        mask.i = be32_to_cpu(p->mask);
4013        val.i = be32_to_cpu(p->val);
4014
4015        if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4016            mutex_is_locked(&connection->cstate_mutex)) {
4017                conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4018                return 0;
4019        }
4020
4021        mask = convert_state(mask);
4022        val = convert_state(val);
4023
4024        rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4025        conn_send_sr_reply(connection, rv);
4026
4027        return 0;
4028}
4029
4030static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4031{
4032        struct drbd_peer_device *peer_device;
4033        struct drbd_device *device;
4034        struct p_state *p = pi->data;
4035        union drbd_state os, ns, peer_state;
4036        enum drbd_disk_state real_peer_disk;
4037        enum chg_state_flags cs_flags;
4038        int rv;
4039
4040        peer_device = conn_peer_device(connection, pi->vnr);
4041        if (!peer_device)
4042                return config_unknown_volume(connection, pi);
4043        device = peer_device->device;
4044
4045        peer_state.i = be32_to_cpu(p->state);
4046
4047        real_peer_disk = peer_state.disk;
4048        if (peer_state.disk == D_NEGOTIATING) {
4049                real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4050                drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4051        }
4052
4053        spin_lock_irq(&device->resource->req_lock);
4054 retry:
4055        os = ns = drbd_read_state(device);
4056        spin_unlock_irq(&device->resource->req_lock);
4057
4058        /* If some other part of the code (asender thread, timeout)
4059         * already decided to close the connection again,
4060         * we must not "re-establish" it here. */
4061        if (os.conn <= C_TEAR_DOWN)
4062                return -ECONNRESET;
4063
4064        /* If this is the "end of sync" confirmation, usually the peer disk
4065         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4066         * set) resync started in PausedSyncT, or if the timing of pause-/
4067         * unpause-sync events has been "just right", the peer disk may
4068         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4069         */
4070        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4071            real_peer_disk == D_UP_TO_DATE &&
4072            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4073                /* If we are (becoming) SyncSource, but peer is still in sync
4074                 * preparation, ignore its uptodate-ness to avoid flapping, it
4075                 * will change to inconsistent once the peer reaches active
4076                 * syncing states.
4077                 * It may have changed syncer-paused flags, however, so we
4078                 * cannot ignore this completely. */
4079                if (peer_state.conn > C_CONNECTED &&
4080                    peer_state.conn < C_SYNC_SOURCE)
4081                        real_peer_disk = D_INCONSISTENT;
4082
4083                /* if peer_state changes to connected at the same time,
4084                 * it explicitly notifies us that it finished resync.
4085                 * Maybe we should finish it up, too? */
4086                else if (os.conn >= C_SYNC_SOURCE &&
4087                         peer_state.conn == C_CONNECTED) {
4088                        if (drbd_bm_total_weight(device) <= device->rs_failed)
4089                                drbd_resync_finished(device);
4090                        return 0;
4091                }
4092        }
4093
4094        /* explicit verify finished notification, stop sector reached. */
4095        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4096            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4097                ov_out_of_sync_print(device);
4098                drbd_resync_finished(device);
4099                return 0;
4100        }
4101
4102        /* peer says his disk is inconsistent, while we think it is uptodate,
4103         * and this happens while the peer still thinks we have a sync going on,
4104         * but we think we are already done with the sync.
4105         * We ignore this to avoid flapping pdsk.
4106         * This should not happen, if the peer is a recent version of drbd. */
4107        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4108            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4109                real_peer_disk = D_UP_TO_DATE;
4110
4111        if (ns.conn == C_WF_REPORT_PARAMS)
4112                ns.conn = C_CONNECTED;
4113
4114        if (peer_state.conn == C_AHEAD)
4115                ns.conn = C_BEHIND;
4116
4117        if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4118            get_ldev_if_state(device, D_NEGOTIATING)) {
4119                int cr; /* consider resync */
4120
4121                /* if we established a new connection */
4122                cr  = (os.conn < C_CONNECTED);
4123                /* if we had an established connection
4124                 * and one of the nodes newly attaches a disk */
4125                cr |= (os.conn == C_CONNECTED &&
4126                       (peer_state.disk == D_NEGOTIATING ||
4127                        os.disk == D_NEGOTIATING));
4128                /* if we have both been inconsistent, and the peer has been
4129                 * forced to be UpToDate with --overwrite-data */
4130                cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4131                /* if we had been plain connected, and the admin requested to
4132                 * start a sync by "invalidate" or "invalidate-remote" */
4133                cr |= (os.conn == C_CONNECTED &&
4134                                (peer_state.conn >= C_STARTING_SYNC_S &&
4135                                 peer_state.conn <= C_WF_BITMAP_T));
4136
4137                if (cr)
4138                        ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4139
4140                put_ldev(device);
4141                if (ns.conn == C_MASK) {
4142                        ns.conn = C_CONNECTED;
4143                        if (device->state.disk == D_NEGOTIATING) {
4144                                drbd_force_state(device, NS(disk, D_FAILED));
4145                        } else if (peer_state.disk == D_NEGOTIATING) {
4146                                drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4147                                peer_state.disk = D_DISKLESS;
4148                                real_peer_disk = D_DISKLESS;
4149                        } else {
4150                                if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4151                                        return -EIO;
4152                                D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4153                                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154                                return -EIO;
4155                        }
4156                }
4157        }
4158
4159        spin_lock_irq(&device->resource->req_lock);
4160        if (os.i != drbd_read_state(device).i)
4161                goto retry;
4162        clear_bit(CONSIDER_RESYNC, &device->flags);
4163        ns.peer = peer_state.role;
4164        ns.pdsk = real_peer_disk;
4165        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4166        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4167                ns.disk = device->new_state_tmp.disk;
4168        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4169        if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4170            test_bit(NEW_CUR_UUID, &device->flags)) {
4171                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4172                   for temporal network outages! */
4173                spin_unlock_irq(&device->resource->req_lock);
4174                drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4175                tl_clear(peer_device->connection);
4176                drbd_uuid_new_current(device);
4177                clear_bit(NEW_CUR_UUID, &device->flags);
4178                conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4179                return -EIO;
4180        }
4181        rv = _drbd_set_state(device, ns, cs_flags, NULL);
4182        ns = drbd_read_state(device);
4183        spin_unlock_irq(&device->resource->req_lock);
4184
4185        if (rv < SS_SUCCESS) {
4186                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4187                return -EIO;
4188        }
4189
4190        if (os.conn > C_WF_REPORT_PARAMS) {
4191                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4192                    peer_state.disk != D_NEGOTIATING ) {
4193                        /* we want resync, peer has not yet decided to sync... */
4194                        /* Nowadays only used when forcing a node into primary role and
4195                           setting its disk to UpToDate with that */
4196                        drbd_send_uuids(peer_device);
4197                        drbd_send_current_state(peer_device);
4198                }
4199        }
4200
4201        clear_bit(DISCARD_MY_DATA, &device->flags);
4202
4203        drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4204
4205        return 0;
4206}
4207
4208static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4209{
4210        struct drbd_peer_device *peer_device;
4211        struct drbd_device *device;
4212        struct p_rs_uuid *p = pi->data;
4213
4214        peer_device = conn_peer_device(connection, pi->vnr);
4215        if (!peer_device)
4216                return -EIO;
4217        device = peer_device->device;
4218
4219        wait_event(device->misc_wait,
4220                   device->state.conn == C_WF_SYNC_UUID ||
4221                   device->state.conn == C_BEHIND ||
4222                   device->state.conn < C_CONNECTED ||
4223                   device->state.disk < D_NEGOTIATING);
4224
4225        /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4226
4227        /* Here the _drbd_uuid_ functions are right, current should
4228           _not_ be rotated into the history */
4229        if (get_ldev_if_state(device, D_NEGOTIATING)) {
4230                _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4231                _drbd_uuid_set(device, UI_BITMAP, 0UL);
4232
4233                drbd_print_uuids(device, "updated sync uuid");
4234                drbd_start_resync(device, C_SYNC_TARGET);
4235
4236                put_ldev(device);
4237        } else
4238                drbd_err(device, "Ignoring SyncUUID packet!\n");
4239
4240        return 0;
4241}
4242
4243/**
4244 * receive_bitmap_plain
4245 *
4246 * Return 0 when done, 1 when another iteration is needed, and a negative error
4247 * code upon failure.
4248 */
4249static int
4250receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4251                     unsigned long *p, struct bm_xfer_ctx *c)
4252{
4253        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4254                                 drbd_header_size(peer_device->connection);
4255        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4256                                       c->bm_words - c->word_offset);
4257        unsigned int want = num_words * sizeof(*p);
4258        int err;
4259
4260        if (want != size) {
4261                drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4262                return -EIO;
4263        }
4264        if (want == 0)
4265                return 0;
4266        err = drbd_recv_all(peer_device->connection, p, want);
4267        if (err)
4268                return err;
4269
4270        drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4271
4272        c->word_offset += num_words;
4273        c->bit_offset = c->word_offset * BITS_PER_LONG;
4274        if (c->bit_offset > c->bm_bits)
4275                c->bit_offset = c->bm_bits;
4276
4277        return 1;
4278}
4279
4280static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4281{
4282        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4283}
4284
4285static int dcbp_get_start(struct p_compressed_bm *p)
4286{
4287        return (p->encoding & 0x80) != 0;
4288}
4289
4290static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4291{
4292        return (p->encoding >> 4) & 0x7;
4293}
4294
4295/**
4296 * recv_bm_rle_bits
4297 *
4298 * Return 0 when done, 1 when another iteration is needed, and a negative error
4299 * code upon failure.
4300 */
4301static int
4302recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4303                struct p_compressed_bm *p,
4304                 struct bm_xfer_ctx *c,
4305                 unsigned int len)
4306{
4307        struct bitstream bs;
4308        u64 look_ahead;
4309        u64 rl;
4310        u64 tmp;
4311        unsigned long s = c->bit_offset;
4312        unsigned long e;
4313        int toggle = dcbp_get_start(p);
4314        int have;
4315        int bits;
4316
4317        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4318
4319        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4320        if (bits < 0)
4321                return -EIO;
4322
4323        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4324                bits = vli_decode_bits(&rl, look_ahead);
4325                if (bits <= 0)
4326                        return -EIO;
4327
4328                if (toggle) {
4329                        e = s + rl -1;
4330                        if (e >= c->bm_bits) {
4331                                drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4332                                return -EIO;
4333                        }
4334                        _drbd_bm_set_bits(peer_device->device, s, e);
4335                }
4336
4337                if (have < bits) {
4338                        drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4339                                have, bits, look_ahead,
4340                                (unsigned int)(bs.cur.b - p->code),
4341                                (unsigned int)bs.buf_len);
4342                        return -EIO;
4343                }
4344                /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4345                if (likely(bits < 64))
4346                        look_ahead >>= bits;
4347                else
4348                        look_ahead = 0;
4349                have -= bits;
4350
4351                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4352                if (bits < 0)
4353                        return -EIO;
4354                look_ahead |= tmp << have;
4355                have += bits;
4356        }
4357
4358        c->bit_offset = s;
4359        bm_xfer_ctx_bit_to_word_offset(c);
4360
4361        return (s != c->bm_bits);
4362}
4363
4364/**
4365 * decode_bitmap_c
4366 *
4367 * Return 0 when done, 1 when another iteration is needed, and a negative error
4368 * code upon failure.
4369 */
4370static int
4371decode_bitmap_c(struct drbd_peer_device *peer_device,
4372                struct p_compressed_bm *p,
4373                struct bm_xfer_ctx *c,
4374                unsigned int len)
4375{
4376        if (dcbp_get_code(p) == RLE_VLI_Bits)
4377                return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4378
4379        /* other variants had been implemented for evaluation,
4380         * but have been dropped as this one turned out to be "best"
4381         * during all our tests. */
4382
4383        drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4384        conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4385        return -EIO;
4386}
4387
4388void INFO_bm_xfer_stats(struct drbd_device *device,
4389                const char *direction, struct bm_xfer_ctx *c)
4390{
4391        /* what would it take to transfer it "plaintext" */
4392        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4393        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4394        unsigned int plain =
4395                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4396                c->bm_words * sizeof(unsigned long);
4397        unsigned int total = c->bytes[0] + c->bytes[1];
4398        unsigned int r;
4399
4400        /* total can not be zero. but just in case: */
4401        if (total == 0)
4402                return;
4403
4404        /* don't report if not compressed */
4405        if (total >= plain)
4406                return;
4407
4408        /* total < plain. check for overflow, still */
4409        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4410                                    : (1000 * total / plain);
4411
4412        if (r > 1000)
4413                r = 1000;
4414
4415        r = 1000 - r;
4416        drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4417             "total %u; compression: %u.%u%%\n",
4418                        direction,
4419                        c->bytes[1], c->packets[1],
4420                        c->bytes[0], c->packets[0],
4421                        total, r/10, r % 10);
4422}
4423
4424/* Since we are processing the bitfield from lower addresses to higher,
4425   it does not matter if the process it in 32 bit chunks or 64 bit
4426   chunks as long as it is little endian. (Understand it as byte stream,
4427   beginning with the lowest byte...) If we would use big endian
4428   we would need to process it from the highest address to the lowest,
4429   in order to be agnostic to the 32 vs 64 bits issue.
4430
4431   returns 0 on failure, 1 if we successfully received it. */
4432static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4433{
4434        struct drbd_peer_device *peer_device;
4435        struct drbd_device *device;
4436        struct bm_xfer_ctx c;
4437        int err;
4438
4439        peer_device = conn_peer_device(connection, pi->vnr);
4440        if (!peer_device)
4441                return -EIO;
4442        device = peer_device->device;
4443
4444        drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4445        /* you are supposed to send additional out-of-sync information
4446         * if you actually set bits during this phase */
4447
4448        c = (struct bm_xfer_ctx) {
4449                .bm_bits = drbd_bm_bits(device),
4450                .bm_words = drbd_bm_words(device),
4451        };
4452
4453        for(;;) {
4454                if (pi->cmd == P_BITMAP)
4455                        err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4456                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4457                        /* MAYBE: sanity check that we speak proto >= 90,
4458                         * and the feature is enabled! */
4459                        struct p_compressed_bm *p = pi->data;
4460
4461                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4462                                drbd_err(device, "ReportCBitmap packet too large\n");
4463                                err = -EIO;
4464                                goto out;
4465                        }
4466                        if (pi->size <= sizeof(*p)) {
4467                                drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4468                                err = -EIO;
4469                                goto out;
4470                        }
4471                        err = drbd_recv_all(peer_device->connection, p, pi->size);
4472                        if (err)
4473                               goto out;
4474                        err = decode_bitmap_c(peer_device, p, &c, pi->size);
4475                } else {
4476                        drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4477                        err = -EIO;
4478                        goto out;
4479                }
4480
4481                c.packets[pi->cmd == P_BITMAP]++;
4482                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4483
4484                if (err <= 0) {
4485                        if (err < 0)
4486                                goto out;
4487                        break;
4488                }
4489                err = drbd_recv_header(peer_device->connection, pi);
4490                if (err)
4491                        goto out;
4492        }
4493
4494        INFO_bm_xfer_stats(device, "receive", &c);
4495
4496        if (device->state.conn == C_WF_BITMAP_T) {
4497                enum drbd_state_rv rv;
4498
4499                err = drbd_send_bitmap(device);
4500                if (err)
4501                        goto out;
4502                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4503                rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4504                D_ASSERT(device, rv == SS_SUCCESS);
4505        } else if (device->state.conn != C_WF_BITMAP_S) {
4506                /* admin may have requested C_DISCONNECTING,
4507                 * other threads may have noticed network errors */
4508                drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4509                    drbd_conn_str(device->state.conn));
4510        }
4511        err = 0;
4512
4513 out:
4514        drbd_bm_unlock(device);
4515        if (!err && device->state.conn == C_WF_BITMAP_S)
4516                drbd_start_resync(device, C_SYNC_SOURCE);
4517        return err;
4518}
4519
4520static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4521{
4522        drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4523                 pi->cmd, pi->size);
4524
4525        return ignore_remaining_packet(connection, pi);
4526}
4527
4528static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4529{
4530        /* Make sure we've acked all the TCP data associated
4531         * with the data requests being unplugged */
4532        drbd_tcp_quickack(connection->data.socket);
4533
4534        return 0;
4535}
4536
4537static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4538{
4539        struct drbd_peer_device *peer_device;
4540        struct drbd_device *device;
4541        struct p_block_desc *p = pi->data;
4542
4543        peer_device = conn_peer_device(connection, pi->vnr);
4544        if (!peer_device)
4545                return -EIO;
4546        device = peer_device->device;
4547
4548        switch (device->state.conn) {
4549        case C_WF_SYNC_UUID:
4550        case C_WF_BITMAP_T:
4551        case C_BEHIND:
4552                        break;
4553        default:
4554                drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4555                                drbd_conn_str(device->state.conn));
4556        }
4557
4558        drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4559
4560        return 0;
4561}
4562
4563struct data_cmd {
4564        int expect_payload;
4565        size_t pkt_size;
4566        int (*fn)(struct drbd_connection *, struct packet_info *);
4567};
4568
4569static struct data_cmd drbd_cmd_handler[] = {
4570        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4571        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4572        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4573        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4574        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4575        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4576        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4577        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4578        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4580        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4581        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4582        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4583        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4584        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4585        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4586        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4587        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4588        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4589        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4591        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4592        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4593        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4594        [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4595};
4596
4597static void drbdd(struct drbd_connection *connection)
4598{
4599        struct packet_info pi;
4600        size_t shs; /* sub header size */
4601        int err;
4602
4603        while (get_t_state(&connection->receiver) == RUNNING) {
4604                struct data_cmd *cmd;
4605
4606                drbd_thread_current_set_cpu(&connection->receiver);
4607                update_receiver_timing_details(connection, drbd_recv_header);
4608                if (drbd_recv_header(connection, &pi))
4609                        goto err_out;
4610
4611                cmd = &drbd_cmd_handler[pi.cmd];
4612                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4613                        drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4614                                 cmdname(pi.cmd), pi.cmd);
4615                        goto err_out;
4616                }
4617
4618                shs = cmd->pkt_size;
4619                if (pi.size > shs && !cmd->expect_payload) {
4620                        drbd_err(connection, "No payload expected %s l:%d\n",
4621                                 cmdname(pi.cmd), pi.size);
4622                        goto err_out;
4623                }
4624
4625                if (shs) {
4626                        update_receiver_timing_details(connection, drbd_recv_all_warn);
4627                        err = drbd_recv_all_warn(connection, pi.data, shs);
4628                        if (err)
4629                                goto err_out;
4630                        pi.size -= shs;
4631                }
4632
4633                update_receiver_timing_details(connection, cmd->fn);
4634                err = cmd->fn(connection, &pi);
4635                if (err) {
4636                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4637                                 cmdname(pi.cmd), err, pi.size);
4638                        goto err_out;
4639                }
4640        }
4641        return;
4642
4643    err_out:
4644        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4645}
4646
4647static void conn_disconnect(struct drbd_connection *connection)
4648{
4649        struct drbd_peer_device *peer_device;
4650        enum drbd_conns oc;
4651        int vnr;
4652
4653        if (connection->cstate == C_STANDALONE)
4654                return;
4655
4656        /* We are about to start the cleanup after connection loss.
4657         * Make sure drbd_make_request knows about that.
4658         * Usually we should be in some network failure state already,
4659         * but just in case we are not, we fix it up here.
4660         */
4661        conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4662
4663        /* asender does not clean up anything. it must not interfere, either */
4664        drbd_thread_stop(&connection->asender);
4665        drbd_free_sock(connection);
4666
4667        rcu_read_lock();
4668        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4669                struct drbd_device *device = peer_device->device;
4670                kref_get(&device->kref);
4671                rcu_read_unlock();
4672                drbd_disconnected(peer_device);
4673                kref_put(&device->kref, drbd_destroy_device);
4674                rcu_read_lock();
4675        }
4676        rcu_read_unlock();
4677
4678        if (!list_empty(&connection->current_epoch->list))
4679                drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4680        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4681        atomic_set(&connection->current_epoch->epoch_size, 0);
4682        connection->send.seen_any_write_yet = false;
4683
4684        drbd_info(connection, "Connection closed\n");
4685
4686        if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4687                conn_try_outdate_peer_async(connection);
4688
4689        spin_lock_irq(&connection->resource->req_lock);
4690        oc = connection->cstate;
4691        if (oc >= C_UNCONNECTED)
4692                _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4693
4694        spin_unlock_irq(&connection->resource->req_lock);
4695
4696        if (oc == C_DISCONNECTING)
4697                conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4698}
4699
4700static int drbd_disconnected(struct drbd_peer_device *peer_device)
4701{
4702        struct drbd_device *device = peer_device->device;
4703        unsigned int i;
4704
4705        /* wait for current activity to cease. */
4706        spin_lock_irq(&device->resource->req_lock);
4707        _drbd_wait_ee_list_empty(device, &device->active_ee);
4708        _drbd_wait_ee_list_empty(device, &device->sync_ee);
4709        _drbd_wait_ee_list_empty(device, &device->read_ee);
4710        spin_unlock_irq(&device->resource->req_lock);
4711
4712        /* We do not have data structures that would allow us to
4713         * get the rs_pending_cnt down to 0 again.
4714         *  * On C_SYNC_TARGET we do not have any data structures describing
4715         *    the pending RSDataRequest's we have sent.
4716         *  * On C_SYNC_SOURCE there is no data structure that tracks
4717         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4718         *  And no, it is not the sum of the reference counts in the
4719         *  resync_LRU. The resync_LRU tracks the whole operation including
4720         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4721         *  on the fly. */
4722        drbd_rs_cancel_all(device);
4723        device->rs_total = 0;
4724        device->rs_failed = 0;
4725        atomic_set(&device->rs_pending_cnt, 0);
4726        wake_up(&device->misc_wait);
4727
4728        del_timer_sync(&device->resync_timer);
4729        resync_timer_fn((unsigned long)device);
4730
4731        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4732         * w_make_resync_request etc. which may still be on the worker queue
4733         * to be "canceled" */
4734        drbd_flush_workqueue(&peer_device->connection->sender_work);
4735
4736        drbd_finish_peer_reqs(device);
4737
4738        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4739           might have issued a work again. The one before drbd_finish_peer_reqs() is
4740           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4741        drbd_flush_workqueue(&peer_device->connection->sender_work);
4742
4743        /* need to do it again, drbd_finish_peer_reqs() may have populated it
4744         * again via drbd_try_clear_on_disk_bm(). */
4745        drbd_rs_cancel_all(device);
4746
4747        kfree(device->p_uuid);
4748        device->p_uuid = NULL;
4749
4750        if (!drbd_suspended(device))
4751                tl_clear(peer_device->connection);
4752
4753        drbd_md_sync(device);
4754
4755        /* serialize with bitmap writeout triggered by the state change,
4756         * if any. */
4757        wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4758
4759        /* tcp_close and release of sendpage pages can be deferred.  I don't
4760         * want to use SO_LINGER, because apparently it can be deferred for
4761         * more than 20 seconds (longest time I checked).
4762         *
4763         * Actually we don't care for exactly when the network stack does its
4764         * put_page(), but release our reference on these pages right here.
4765         */
4766        i = drbd_free_peer_reqs(device, &device->net_ee);
4767        if (i)
4768                drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4769        i = atomic_read(&device->pp_in_use_by_net);
4770        if (i)
4771                drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4772        i = atomic_read(&device->pp_in_use);
4773        if (i)
4774                drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4775
4776        D_ASSERT(device, list_empty(&device->read_ee));
4777        D_ASSERT(device, list_empty(&device->active_ee));
4778        D_ASSERT(device, list_empty(&device->sync_ee));
4779        D_ASSERT(device, list_empty(&device->done_ee));
4780
4781        return 0;
4782}
4783
4784/*
4785 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4786 * we can agree on is stored in agreed_pro_version.
4787 *
4788 * feature flags and the reserved array should be enough room for future
4789 * enhancements of the handshake protocol, and possible plugins...
4790 *
4791 * for now, they are expected to be zero, but ignored.
4792 */
4793static int drbd_send_features(struct drbd_connection *connection)
4794{
4795        struct drbd_socket *sock;
4796        struct p_connection_features *p;
4797
4798        sock = &connection->data;
4799        p = conn_prepare_command(connection, sock);
4800        if (!p)
4801                return -EIO;
4802        memset(p, 0, sizeof(*p));
4803        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4804        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4805        p->feature_flags = cpu_to_be32(PRO_FEATURES);
4806        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4807}
4808
4809/*
4810 * return values:
4811 *   1 yes, we have a valid connection
4812 *   0 oops, did not work out, please try again
4813 *  -1 peer talks different language,
4814 *     no point in trying again, please go standalone.
4815 */
4816static int drbd_do_features(struct drbd_connection *connection)
4817{
4818        /* ASSERT current == connection->receiver ... */
4819        struct p_connection_features *p;
4820        const int expect = sizeof(struct p_connection_features);
4821        struct packet_info pi;
4822        int err;
4823
4824        err = drbd_send_features(connection);
4825        if (err)
4826                return 0;
4827
4828        err = drbd_recv_header(connection, &pi);
4829        if (err)
4830                return 0;
4831
4832        if (pi.cmd != P_CONNECTION_FEATURES) {
4833                drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4834                         cmdname(pi.cmd), pi.cmd);
4835                return -1;
4836        }
4837
4838        if (pi.size != expect) {
4839                drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4840                     expect, pi.size);
4841                return -1;
4842        }
4843
4844        p = pi.data;
4845        err = drbd_recv_all_warn(connection, p, expect);
4846        if (err)
4847                return 0;
4848
4849        p->protocol_min = be32_to_cpu(p->protocol_min);
4850        p->protocol_max = be32_to_cpu(p->protocol_max);
4851        if (p->protocol_max == 0)
4852                p->protocol_max = p->protocol_min;
4853
4854        if (PRO_VERSION_MAX < p->protocol_min ||
4855            PRO_VERSION_MIN > p->protocol_max)
4856                goto incompat;
4857
4858        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4859        connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4860
4861        drbd_info(connection, "Handshake successful: "
4862             "Agreed network protocol version %d\n", connection->agreed_pro_version);
4863
4864        drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4865                  connection->agreed_features & FF_TRIM ? " " : " not ");
4866
4867        return 1;
4868
4869 incompat:
4870        drbd_err(connection, "incompatible DRBD dialects: "
4871            "I support %d-%d, peer supports %d-%d\n",
4872            PRO_VERSION_MIN, PRO_VERSION_MAX,
4873            p->protocol_min, p->protocol_max);
4874        return -1;
4875}
4876
4877#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4878static int drbd_do_auth(struct drbd_connection *connection)
4879{
4880        drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4881        drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4882        return -1;
4883}
4884#else
4885#define CHALLENGE_LEN 64
4886
4887/* Return value:
4888        1 - auth succeeded,
4889        0 - failed, try again (network error),
4890        -1 - auth failed, don't try again.
4891*/
4892
4893static int drbd_do_auth(struct drbd_connection *connection)
4894{
4895        struct drbd_socket *sock;
4896        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4897        struct scatterlist sg;
4898        char *response = NULL;
4899        char *right_response = NULL;
4900        char *peers_ch = NULL;
4901        unsigned int key_len;
4902        char secret[SHARED_SECRET_MAX]; /* 64 byte */
4903        unsigned int resp_size;
4904        struct hash_desc desc;
4905        struct packet_info pi;
4906        struct net_conf *nc;
4907        int err, rv;
4908
4909        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4910
4911        rcu_read_lock();
4912        nc = rcu_dereference(connection->net_conf);
4913        key_len = strlen(nc->shared_secret);
4914        memcpy(secret, nc->shared_secret, key_len);
4915        rcu_read_unlock();
4916
4917        desc.tfm = connection->cram_hmac_tfm;
4918        desc.flags = 0;
4919
4920        rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4921        if (rv) {
4922                drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4923                rv = -1;
4924                goto fail;
4925        }
4926
4927        get_random_bytes(my_challenge, CHALLENGE_LEN);
4928
4929        sock = &connection->data;
4930        if (!conn_prepare_command(connection, sock)) {
4931                rv = 0;
4932                goto fail;
4933        }
4934        rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4935                                my_challenge, CHALLENGE_LEN);
4936        if (!rv)
4937                goto fail;
4938
4939        err = drbd_recv_header(connection, &pi);
4940        if (err) {
4941                rv = 0;
4942                goto fail;
4943        }
4944
4945        if (pi.cmd != P_AUTH_CHALLENGE) {
4946                drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4947                         cmdname(pi.cmd), pi.cmd);
4948                rv = 0;
4949                goto fail;
4950        }
4951
4952        if (pi.size > CHALLENGE_LEN * 2) {
4953                drbd_err(connection, "expected AuthChallenge payload too big.\n");
4954                rv = -1;
4955                goto fail;
4956        }
4957
4958        if (pi.size < CHALLENGE_LEN) {
4959                drbd_err(connection, "AuthChallenge payload too small.\n");
4960                rv = -1;
4961                goto fail;
4962        }
4963
4964        peers_ch = kmalloc(pi.size, GFP_NOIO);
4965        if (peers_ch == NULL) {
4966                drbd_err(connection, "kmalloc of peers_ch failed\n");
4967                rv = -1;
4968                goto fail;
4969        }
4970
4971        err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4972        if (err) {
4973                rv = 0;
4974                goto fail;
4975        }
4976
4977        if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4978                drbd_err(connection, "Peer presented the same challenge!\n");
4979                rv = -1;
4980                goto fail;
4981        }
4982
4983        resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4984        response = kmalloc(resp_size, GFP_NOIO);
4985        if (response == NULL) {
4986                drbd_err(connection, "kmalloc of response failed\n");
4987                rv = -1;
4988                goto fail;
4989        }
4990
4991        sg_init_table(&sg, 1);
4992        sg_set_buf(&sg, peers_ch, pi.size);
4993
4994        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4995        if (rv) {
4996                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4997                rv = -1;
4998                goto fail;
4999        }
5000
5001        if (!conn_prepare_command(connection, sock)) {
5002                rv = 0;
5003                goto fail;
5004        }
5005        rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5006                                response, resp_size);
5007        if (!rv)
5008                goto fail;
5009
5010        err = drbd_recv_header(connection, &pi);
5011        if (err) {
5012                rv = 0;
5013                goto fail;
5014        }
5015
5016        if (pi.cmd != P_AUTH_RESPONSE) {
5017                drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5018                         cmdname(pi.cmd), pi.cmd);
5019                rv = 0;
5020                goto fail;
5021        }
5022
5023        if (pi.size != resp_size) {
5024                drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5025                rv = 0;
5026                goto fail;
5027        }
5028
5029        err = drbd_recv_all_warn(connection, response , resp_size);
5030        if (err) {
5031                rv = 0;
5032                goto fail;
5033        }
5034
5035        right_response = kmalloc(resp_size, GFP_NOIO);
5036        if (right_response == NULL) {
5037                drbd_err(connection, "kmalloc of right_response failed\n");
5038                rv = -1;
5039                goto fail;
5040        }
5041
5042        sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5043
5044        rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5045        if (rv) {
5046                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5047                rv = -1;
5048                goto fail;
5049        }
5050
5051        rv = !memcmp(response, right_response, resp_size);
5052
5053        if (rv)
5054                drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5055                     resp_size);
5056        else
5057                rv = -1;
5058
5059 fail:
5060        kfree(peers_ch);
5061        kfree(response);
5062        kfree(right_response);
5063
5064        return rv;
5065}
5066#endif
5067
5068int drbd_receiver(struct drbd_thread *thi)
5069{
5070        struct drbd_connection *connection = thi->connection;
5071        int h;
5072
5073        drbd_info(connection, "receiver (re)started\n");
5074
5075        do {
5076                h = conn_connect(connection);
5077                if (h == 0) {
5078                        conn_disconnect(connection);
5079                        schedule_timeout_interruptible(HZ);
5080                }
5081                if (h == -1) {
5082                        drbd_warn(connection, "Discarding network configuration.\n");
5083                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5084                }
5085        } while (h == 0);
5086
5087        if (h > 0)
5088                drbdd(connection);
5089
5090        conn_disconnect(connection);
5091
5092        drbd_info(connection, "receiver terminated\n");
5093        return 0;
5094}
5095
5096/* ********* acknowledge sender ******** */
5097
5098static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5099{
5100        struct p_req_state_reply *p = pi->data;
5101        int retcode = be32_to_cpu(p->retcode);
5102
5103        if (retcode >= SS_SUCCESS) {
5104                set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5105        } else {
5106                set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5107                drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5108                         drbd_set_st_err_str(retcode), retcode);
5109        }
5110        wake_up(&connection->ping_wait);
5111
5112        return 0;
5113}
5114
5115static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5116{
5117        struct drbd_peer_device *peer_device;
5118        struct drbd_device *device;
5119        struct p_req_state_reply *p = pi->data;
5120        int retcode = be32_to_cpu(p->retcode);
5121
5122        peer_device = conn_peer_device(connection, pi->vnr);
5123        if (!peer_device)
5124                return -EIO;
5125        device = peer_device->device;
5126
5127        if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5128                D_ASSERT(device, connection->agreed_pro_version < 100);
5129                return got_conn_RqSReply(connection, pi);
5130        }
5131
5132        if (retcode >= SS_SUCCESS) {
5133                set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5134        } else {
5135                set_bit(CL_ST_CHG_FAIL, &device->flags);
5136                drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5137                        drbd_set_st_err_str(retcode), retcode);
5138        }
5139        wake_up(&device->state_wait);
5140
5141        return 0;
5142}
5143
5144static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5145{
5146        return drbd_send_ping_ack(connection);
5147
5148}
5149
5150static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5151{
5152        /* restore idle timeout */
5153        connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5154        if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5155                wake_up(&connection->ping_wait);
5156
5157        return 0;
5158}
5159
5160static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5161{
5162        struct drbd_peer_device *peer_device;
5163        struct drbd_device *device;
5164        struct p_block_ack *p = pi->data;
5165        sector_t sector = be64_to_cpu(p->sector);
5166        int blksize = be32_to_cpu(p->blksize);
5167
5168        peer_device = conn_peer_device(connection, pi->vnr);
5169        if (!peer_device)
5170                return -EIO;
5171        device = peer_device->device;
5172
5173        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5174
5175        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5176
5177        if (get_ldev(device)) {
5178                drbd_rs_complete_io(device, sector);
5179                drbd_set_in_sync(device, sector, blksize);
5180                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5181                device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5182                put_ldev(device);
5183        }
5184        dec_rs_pending(device);
5185        atomic_add(blksize >> 9, &device->rs_sect_in);
5186
5187        return 0;
5188}
5189
5190static int
5191validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5192                              struct rb_root *root, const char *func,
5193                              enum drbd_req_event what, bool missing_ok)
5194{
5195        struct drbd_request *req;
5196        struct bio_and_error m;
5197
5198        spin_lock_irq(&device->resource->req_lock);
5199        req = find_request(device, root, id, sector, missing_ok, func);
5200        if (unlikely(!req)) {
5201                spin_unlock_irq(&device->resource->req_lock);
5202                return -EIO;
5203        }
5204        __req_mod(req, what, &m);
5205        spin_unlock_irq(&device->resource->req_lock);
5206
5207        if (m.bio)
5208                complete_master_bio(device, &m);
5209        return 0;
5210}
5211
5212static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5213{
5214        struct drbd_peer_device *peer_device;
5215        struct drbd_device *device;
5216        struct p_block_ack *p = pi->data;
5217        sector_t sector = be64_to_cpu(p->sector);
5218        int blksize = be32_to_cpu(p->blksize);
5219        enum drbd_req_event what;
5220
5221        peer_device = conn_peer_device(connection, pi->vnr);
5222        if (!peer_device)
5223                return -EIO;
5224        device = peer_device->device;
5225
5226        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227
5228        if (p->block_id == ID_SYNCER) {
5229                drbd_set_in_sync(device, sector, blksize);
5230                dec_rs_pending(device);
5231                return 0;
5232        }
5233        switch (pi->cmd) {
5234        case P_RS_WRITE_ACK:
5235                what = WRITE_ACKED_BY_PEER_AND_SIS;
5236                break;
5237        case P_WRITE_ACK:
5238                what = WRITE_ACKED_BY_PEER;
5239                break;
5240        case P_RECV_ACK:
5241                what = RECV_ACKED_BY_PEER;
5242                break;
5243        case P_SUPERSEDED:
5244                what = CONFLICT_RESOLVED;
5245                break;
5246        case P_RETRY_WRITE:
5247                what = POSTPONE_WRITE;
5248                break;
5249        default:
5250                BUG();
5251        }
5252
5253        return validate_req_change_req_state(device, p->block_id, sector,
5254                                             &device->write_requests, __func__,
5255                                             what, false);
5256}
5257
5258static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5259{
5260        struct drbd_peer_device *peer_device;
5261        struct drbd_device *device;
5262        struct p_block_ack *p = pi->data;
5263        sector_t sector = be64_to_cpu(p->sector);
5264        int size = be32_to_cpu(p->blksize);
5265        int err;
5266
5267        peer_device = conn_peer_device(connection, pi->vnr);
5268        if (!peer_device)
5269                return -EIO;
5270        device = peer_device->device;
5271
5272        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5273
5274        if (p->block_id == ID_SYNCER) {
5275                dec_rs_pending(device);
5276                drbd_rs_failed_io(device, sector, size);
5277                return 0;
5278        }
5279
5280        err = validate_req_change_req_state(device, p->block_id, sector,
5281                                            &device->write_requests, __func__,
5282                                            NEG_ACKED, true);
5283        if (err) {
5284                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5285                   The master bio might already be completed, therefore the
5286                   request is no longer in the collision hash. */
5287                /* In Protocol B we might already have got a P_RECV_ACK
5288                   but then get a P_NEG_ACK afterwards. */
5289                drbd_set_out_of_sync(device, sector, size);
5290        }
5291        return 0;
5292}
5293
5294static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5295{
5296        struct drbd_peer_device *peer_device;
5297        struct drbd_device *device;
5298        struct p_block_ack *p = pi->data;
5299        sector_t sector = be64_to_cpu(p->sector);
5300
5301        peer_device = conn_peer_device(connection, pi->vnr);
5302        if (!peer_device)
5303                return -EIO;
5304        device = peer_device->device;
5305
5306        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5307
5308        drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5309            (unsigned long long)sector, be32_to_cpu(p->blksize));
5310
5311        return validate_req_change_req_state(device, p->block_id, sector,
5312                                             &device->read_requests, __func__,
5313                                             NEG_ACKED, false);
5314}
5315
5316static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5317{
5318        struct drbd_peer_device *peer_device;
5319        struct drbd_device *device;
5320        sector_t sector;
5321        int size;
5322        struct p_block_ack *p = pi->data;
5323
5324        peer_device = conn_peer_device(connection, pi->vnr);
5325        if (!peer_device)
5326                return -EIO;
5327        device = peer_device->device;
5328
5329        sector = be64_to_cpu(p->sector);
5330        size = be32_to_cpu(p->blksize);
5331
5332        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5333
5334        dec_rs_pending(device);
5335
5336        if (get_ldev_if_state(device, D_FAILED)) {
5337                drbd_rs_complete_io(device, sector);
5338                switch (pi->cmd) {
5339                case P_NEG_RS_DREPLY:
5340                        drbd_rs_failed_io(device, sector, size);
5341                case P_RS_CANCEL:
5342                        break;
5343                default:
5344                        BUG();
5345                }
5346                put_ldev(device);
5347        }
5348
5349        return 0;
5350}
5351
5352static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5353{
5354        struct p_barrier_ack *p = pi->data;
5355        struct drbd_peer_device *peer_device;
5356        int vnr;
5357
5358        tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5359
5360        rcu_read_lock();
5361        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5362                struct drbd_device *device = peer_device->device;
5363
5364                if (device->state.conn == C_AHEAD &&
5365                    atomic_read(&device->ap_in_flight) == 0 &&
5366                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5367                        device->start_resync_timer.expires = jiffies + HZ;
5368                        add_timer(&device->start_resync_timer);
5369                }
5370        }
5371        rcu_read_unlock();
5372
5373        return 0;
5374}
5375
5376static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5377{
5378        struct drbd_peer_device *peer_device;
5379        struct drbd_device *device;
5380        struct p_block_ack *p = pi->data;
5381        struct drbd_device_work *dw;
5382        sector_t sector;
5383        int size;
5384
5385        peer_device = conn_peer_device(connection, pi->vnr);
5386        if (!peer_device)
5387                return -EIO;
5388        device = peer_device->device;
5389
5390        sector = be64_to_cpu(p->sector);
5391        size = be32_to_cpu(p->blksize);
5392
5393        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5394
5395        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5396                drbd_ov_out_of_sync_found(device, sector, size);
5397        else
5398                ov_out_of_sync_print(device);
5399
5400        if (!get_ldev(device))
5401                return 0;
5402
5403        drbd_rs_complete_io(device, sector);
5404        dec_rs_pending(device);
5405
5406        --device->ov_left;
5407
5408        /* let's advance progress step marks only for every other megabyte */
5409        if ((device->ov_left & 0x200) == 0x200)
5410                drbd_advance_rs_marks(device, device->ov_left);
5411
5412        if (device->ov_left == 0) {
5413                dw = kmalloc(sizeof(*dw), GFP_NOIO);
5414                if (dw) {
5415                        dw->w.cb = w_ov_finished;
5416                        dw->device = device;
5417                        drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5418                } else {
5419                        drbd_err(device, "kmalloc(dw) failed.");
5420                        ov_out_of_sync_print(device);
5421                        drbd_resync_finished(device);
5422                }
5423        }
5424        put_ldev(device);
5425        return 0;
5426}
5427
5428static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5429{
5430        return 0;
5431}
5432
5433static int connection_finish_peer_reqs(struct drbd_connection *connection)
5434{
5435        struct drbd_peer_device *peer_device;
5436        int vnr, not_empty = 0;
5437
5438        do {
5439                clear_bit(SIGNAL_ASENDER, &connection->flags);
5440                flush_signals(current);
5441
5442                rcu_read_lock();
5443                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5444                        struct drbd_device *device = peer_device->device;
5445                        kref_get(&device->kref);
5446                        rcu_read_unlock();
5447                        if (drbd_finish_peer_reqs(device)) {
5448                                kref_put(&device->kref, drbd_destroy_device);
5449                                return 1;
5450                        }
5451                        kref_put(&device->kref, drbd_destroy_device);
5452                        rcu_read_lock();
5453                }
5454                set_bit(SIGNAL_ASENDER, &connection->flags);
5455
5456                spin_lock_irq(&connection->resource->req_lock);
5457                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5458                        struct drbd_device *device = peer_device->device;
5459                        not_empty = !list_empty(&device->done_ee);
5460                        if (not_empty)
5461                                break;
5462                }
5463                spin_unlock_irq(&connection->resource->req_lock);
5464                rcu_read_unlock();
5465        } while (not_empty);
5466
5467        return 0;
5468}
5469
5470struct asender_cmd {
5471        size_t pkt_size;
5472        int (*fn)(struct drbd_connection *connection, struct packet_info *);
5473};
5474
5475static struct asender_cmd asender_tbl[] = {
5476        [P_PING]            = { 0, got_Ping },
5477        [P_PING_ACK]        = { 0, got_PingAck },
5478        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5479        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5480        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5481        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5482        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5483        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5484        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5485        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5486        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5487        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5488        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5489        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5490        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5491        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5492        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5493};
5494
5495int drbd_asender(struct drbd_thread *thi)
5496{
5497        struct drbd_connection *connection = thi->connection;
5498        struct asender_cmd *cmd = NULL;
5499        struct packet_info pi;
5500        int rv;
5501        void *buf    = connection->meta.rbuf;
5502        int received = 0;
5503        unsigned int header_size = drbd_header_size(connection);
5504        int expect   = header_size;
5505        bool ping_timeout_active = false;
5506        struct net_conf *nc;
5507        int ping_timeo, tcp_cork, ping_int;
5508        struct sched_param param = { .sched_priority = 2 };
5509
5510        rv = sched_setscheduler(current, SCHED_RR, &param);
5511        if (rv < 0)
5512                drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5513
5514        while (get_t_state(thi) == RUNNING) {
5515                drbd_thread_current_set_cpu(thi);
5516
5517                rcu_read_lock();
5518                nc = rcu_dereference(connection->net_conf);
5519                ping_timeo = nc->ping_timeo;
5520                tcp_cork = nc->tcp_cork;
5521                ping_int = nc->ping_int;
5522                rcu_read_unlock();
5523
5524                if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5525                        if (drbd_send_ping(connection)) {
5526                                drbd_err(connection, "drbd_send_ping has failed\n");
5527                                goto reconnect;
5528                        }
5529                        connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5530                        ping_timeout_active = true;
5531                }
5532
5533                /* TODO: conditionally cork; it may hurt latency if we cork without
5534                   much to send */
5535                if (tcp_cork)
5536                        drbd_tcp_cork(connection->meta.socket);
5537                if (connection_finish_peer_reqs(connection)) {
5538                        drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5539                        goto reconnect;
5540                }
5541                /* but unconditionally uncork unless disabled */
5542                if (tcp_cork)
5543                        drbd_tcp_uncork(connection->meta.socket);
5544
5545                /* short circuit, recv_msg would return EINTR anyways. */
5546                if (signal_pending(current))
5547                        continue;
5548
5549                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550                clear_bit(SIGNAL_ASENDER, &connection->flags);
5551
5552                flush_signals(current);
5553
5554                /* Note:
5555                 * -EINTR        (on meta) we got a signal
5556                 * -EAGAIN       (on meta) rcvtimeo expired
5557                 * -ECONNRESET   other side closed the connection
5558                 * -ERESTARTSYS  (on data) we got a signal
5559                 * rv <  0       other than above: unexpected error!
5560                 * rv == expected: full header or command
5561                 * rv <  expected: "woken" by signal during receive
5562                 * rv == 0       : "connection shut down by peer"
5563                 */
5564received_more:
5565                if (likely(rv > 0)) {
5566                        received += rv;
5567                        buf      += rv;
5568                } else if (rv == 0) {
5569                        if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5570                                long t;
5571                                rcu_read_lock();
5572                                t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5573                                rcu_read_unlock();
5574
5575                                t = wait_event_timeout(connection->ping_wait,
5576                                                       connection->cstate < C_WF_REPORT_PARAMS,
5577                                                       t);
5578                                if (t)
5579                                        break;
5580                        }
5581                        drbd_err(connection, "meta connection shut down by peer.\n");
5582                        goto reconnect;
5583                } else if (rv == -EAGAIN) {
5584                        /* If the data socket received something meanwhile,
5585                         * that is good enough: peer is still alive. */
5586                        if (time_after(connection->last_received,
5587                                jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5588                                continue;
5589                        if (ping_timeout_active) {
5590                                drbd_err(connection, "PingAck did not arrive in time.\n");
5591                                goto reconnect;
5592                        }
5593                        set_bit(SEND_PING, &connection->flags);
5594                        continue;
5595                } else if (rv == -EINTR) {
5596                        continue;
5597                } else {
5598                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5599                        goto reconnect;
5600                }
5601
5602                if (received == expect && cmd == NULL) {
5603                        if (decode_header(connection, connection->meta.rbuf, &pi))
5604                                goto reconnect;
5605                        cmd = &asender_tbl[pi.cmd];
5606                        if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5607                                drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5608                                         cmdname(pi.cmd), pi.cmd);
5609                                goto disconnect;
5610                        }
5611                        expect = header_size + cmd->pkt_size;
5612                        if (pi.size != expect - header_size) {
5613                                drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5614                                        pi.cmd, pi.size);
5615                                goto reconnect;
5616                        }
5617                }
5618                if (received == expect) {
5619                        bool err;
5620
5621                        err = cmd->fn(connection, &pi);
5622                        if (err) {
5623                                drbd_err(connection, "%pf failed\n", cmd->fn);
5624                                goto reconnect;
5625                        }
5626
5627                        connection->last_received = jiffies;
5628
5629                        if (cmd == &asender_tbl[P_PING_ACK]) {
5630                                /* restore idle timeout */
5631                                connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5632                                ping_timeout_active = false;
5633                        }
5634
5635                        buf      = connection->meta.rbuf;
5636                        received = 0;
5637                        expect   = header_size;
5638                        cmd      = NULL;
5639                }
5640                if (test_bit(SEND_PING, &connection->flags))
5641                        continue;
5642                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5643                if (rv > 0)
5644                        goto received_more;
5645        }
5646
5647        if (0) {
5648reconnect:
5649                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5650                conn_md_sync(connection);
5651        }
5652        if (0) {
5653disconnect:
5654                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5655        }
5656        clear_bit(SIGNAL_ASENDER, &connection->flags);
5657
5658        drbd_info(connection, "asender terminated\n");
5659
5660        return 0;
5661}
5662