linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <linux/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <uapi/linux/sched/types.h>
  40#include <linux/sched/signal.h>
  41#include <linux/pkt_sched.h>
  42#define __KERNEL_SYSCALLS__
  43#include <linux/unistd.h>
  44#include <linux/vmalloc.h>
  45#include <linux/random.h>
  46#include <linux/string.h>
  47#include <linux/scatterlist.h>
  48#include "drbd_int.h"
  49#include "drbd_protocol.h"
  50#include "drbd_req.h"
  51#include "drbd_vli.h"
  52
  53#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
  54
  55struct packet_info {
  56        enum drbd_packet cmd;
  57        unsigned int size;
  58        unsigned int vnr;
  59        void *data;
  60};
  61
  62enum finish_epoch {
  63        FE_STILL_LIVE,
  64        FE_DESTROYED,
  65        FE_RECYCLED,
  66};
  67
  68static int drbd_do_features(struct drbd_connection *connection);
  69static int drbd_do_auth(struct drbd_connection *connection);
  70static int drbd_disconnected(struct drbd_peer_device *);
  71static void conn_wait_active_ee_empty(struct drbd_connection *connection);
  72static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
  73static int e_end_block(struct drbd_work *, int);
  74
  75
  76#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  77
  78/*
  79 * some helper functions to deal with single linked page lists,
  80 * page->private being our "next" pointer.
  81 */
  82
  83/* If at least n pages are linked at head, get n pages off.
  84 * Otherwise, don't modify head, and return NULL.
  85 * Locking is the responsibility of the caller.
  86 */
  87static struct page *page_chain_del(struct page **head, int n)
  88{
  89        struct page *page;
  90        struct page *tmp;
  91
  92        BUG_ON(!n);
  93        BUG_ON(!head);
  94
  95        page = *head;
  96
  97        if (!page)
  98                return NULL;
  99
 100        while (page) {
 101                tmp = page_chain_next(page);
 102                if (--n == 0)
 103                        break; /* found sufficient pages */
 104                if (tmp == NULL)
 105                        /* insufficient pages, don't use any of them. */
 106                        return NULL;
 107                page = tmp;
 108        }
 109
 110        /* add end of list marker for the returned list */
 111        set_page_private(page, 0);
 112        /* actual return value, and adjustment of head */
 113        page = *head;
 114        *head = tmp;
 115        return page;
 116}
 117
 118/* may be used outside of locks to find the tail of a (usually short)
 119 * "private" page chain, before adding it back to a global chain head
 120 * with page_chain_add() under a spinlock. */
 121static struct page *page_chain_tail(struct page *page, int *len)
 122{
 123        struct page *tmp;
 124        int i = 1;
 125        while ((tmp = page_chain_next(page)))
 126                ++i, page = tmp;
 127        if (len)
 128                *len = i;
 129        return page;
 130}
 131
 132static int page_chain_free(struct page *page)
 133{
 134        struct page *tmp;
 135        int i = 0;
 136        page_chain_for_each_safe(page, tmp) {
 137                put_page(page);
 138                ++i;
 139        }
 140        return i;
 141}
 142
 143static void page_chain_add(struct page **head,
 144                struct page *chain_first, struct page *chain_last)
 145{
 146#if 1
 147        struct page *tmp;
 148        tmp = page_chain_tail(chain_first, NULL);
 149        BUG_ON(tmp != chain_last);
 150#endif
 151
 152        /* add chain to head */
 153        set_page_private(chain_last, (unsigned long)*head);
 154        *head = chain_first;
 155}
 156
 157static struct page *__drbd_alloc_pages(struct drbd_device *device,
 158                                       unsigned int number)
 159{
 160        struct page *page = NULL;
 161        struct page *tmp = NULL;
 162        unsigned int i = 0;
 163
 164        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 165         * So what. It saves a spin_lock. */
 166        if (drbd_pp_vacant >= number) {
 167                spin_lock(&drbd_pp_lock);
 168                page = page_chain_del(&drbd_pp_pool, number);
 169                if (page)
 170                        drbd_pp_vacant -= number;
 171                spin_unlock(&drbd_pp_lock);
 172                if (page)
 173                        return page;
 174        }
 175
 176        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 177         * "criss-cross" setup, that might cause write-out on some other DRBD,
 178         * which in turn might block on the other node at this very place.  */
 179        for (i = 0; i < number; i++) {
 180                tmp = alloc_page(GFP_TRY);
 181                if (!tmp)
 182                        break;
 183                set_page_private(tmp, (unsigned long)page);
 184                page = tmp;
 185        }
 186
 187        if (i == number)
 188                return page;
 189
 190        /* Not enough pages immediately available this time.
 191         * No need to jump around here, drbd_alloc_pages will retry this
 192         * function "soon". */
 193        if (page) {
 194                tmp = page_chain_tail(page, NULL);
 195                spin_lock(&drbd_pp_lock);
 196                page_chain_add(&drbd_pp_pool, page, tmp);
 197                drbd_pp_vacant += i;
 198                spin_unlock(&drbd_pp_lock);
 199        }
 200        return NULL;
 201}
 202
 203static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
 204                                           struct list_head *to_be_freed)
 205{
 206        struct drbd_peer_request *peer_req, *tmp;
 207
 208        /* The EEs are always appended to the end of the list. Since
 209           they are sent in order over the wire, they have to finish
 210           in order. As soon as we see the first not finished we can
 211           stop to examine the list... */
 212
 213        list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
 214                if (drbd_peer_req_has_active_page(peer_req))
 215                        break;
 216                list_move(&peer_req->w.list, to_be_freed);
 217        }
 218}
 219
 220static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
 221{
 222        LIST_HEAD(reclaimed);
 223        struct drbd_peer_request *peer_req, *t;
 224
 225        spin_lock_irq(&device->resource->req_lock);
 226        reclaim_finished_net_peer_reqs(device, &reclaimed);
 227        spin_unlock_irq(&device->resource->req_lock);
 228        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 229                drbd_free_net_peer_req(device, peer_req);
 230}
 231
 232static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
 233{
 234        struct drbd_peer_device *peer_device;
 235        int vnr;
 236
 237        rcu_read_lock();
 238        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
 239                struct drbd_device *device = peer_device->device;
 240                if (!atomic_read(&device->pp_in_use_by_net))
 241                        continue;
 242
 243                kref_get(&device->kref);
 244                rcu_read_unlock();
 245                drbd_reclaim_net_peer_reqs(device);
 246                kref_put(&device->kref, drbd_destroy_device);
 247                rcu_read_lock();
 248        }
 249        rcu_read_unlock();
 250}
 251
 252/**
 253 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 254 * @device:     DRBD device.
 255 * @number:     number of pages requested
 256 * @retry:      whether to retry, if not enough pages are available right now
 257 *
 258 * Tries to allocate number pages, first from our own page pool, then from
 259 * the kernel.
 260 * Possibly retry until DRBD frees sufficient pages somewhere else.
 261 *
 262 * If this allocation would exceed the max_buffers setting, we throttle
 263 * allocation (schedule_timeout) to give the system some room to breathe.
 264 *
 265 * We do not use max-buffers as hard limit, because it could lead to
 266 * congestion and further to a distributed deadlock during online-verify or
 267 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 268 * resync-rate settings are mis-configured.
 269 *
 270 * Returns a page chain linked via page->private.
 271 */
 272struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
 273                              bool retry)
 274{
 275        struct drbd_device *device = peer_device->device;
 276        struct page *page = NULL;
 277        struct net_conf *nc;
 278        DEFINE_WAIT(wait);
 279        unsigned int mxb;
 280
 281        rcu_read_lock();
 282        nc = rcu_dereference(peer_device->connection->net_conf);
 283        mxb = nc ? nc->max_buffers : 1000000;
 284        rcu_read_unlock();
 285
 286        if (atomic_read(&device->pp_in_use) < mxb)
 287                page = __drbd_alloc_pages(device, number);
 288
 289        /* Try to keep the fast path fast, but occasionally we need
 290         * to reclaim the pages we lended to the network stack. */
 291        if (page && atomic_read(&device->pp_in_use_by_net) > 512)
 292                drbd_reclaim_net_peer_reqs(device);
 293
 294        while (page == NULL) {
 295                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 296
 297                drbd_reclaim_net_peer_reqs(device);
 298
 299                if (atomic_read(&device->pp_in_use) < mxb) {
 300                        page = __drbd_alloc_pages(device, number);
 301                        if (page)
 302                                break;
 303                }
 304
 305                if (!retry)
 306                        break;
 307
 308                if (signal_pending(current)) {
 309                        drbd_warn(device, "drbd_alloc_pages interrupted!\n");
 310                        break;
 311                }
 312
 313                if (schedule_timeout(HZ/10) == 0)
 314                        mxb = UINT_MAX;
 315        }
 316        finish_wait(&drbd_pp_wait, &wait);
 317
 318        if (page)
 319                atomic_add(number, &device->pp_in_use);
 320        return page;
 321}
 322
 323/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 324 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
 325 * Either links the page chain back to the global pool,
 326 * or returns all pages to the system. */
 327static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
 328{
 329        atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
 330        int i;
 331
 332        if (page == NULL)
 333                return;
 334
 335        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
 336                i = page_chain_free(page);
 337        else {
 338                struct page *tmp;
 339                tmp = page_chain_tail(page, &i);
 340                spin_lock(&drbd_pp_lock);
 341                page_chain_add(&drbd_pp_pool, page, tmp);
 342                drbd_pp_vacant += i;
 343                spin_unlock(&drbd_pp_lock);
 344        }
 345        i = atomic_sub_return(i, a);
 346        if (i < 0)
 347                drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
 348                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 349        wake_up(&drbd_pp_wait);
 350}
 351
 352/*
 353You need to hold the req_lock:
 354 _drbd_wait_ee_list_empty()
 355
 356You must not have the req_lock:
 357 drbd_free_peer_req()
 358 drbd_alloc_peer_req()
 359 drbd_free_peer_reqs()
 360 drbd_ee_fix_bhs()
 361 drbd_finish_peer_reqs()
 362 drbd_clear_done_ee()
 363 drbd_wait_ee_list_empty()
 364*/
 365
 366/* normal: payload_size == request size (bi_size)
 367 * w_same: payload_size == logical_block_size
 368 * trim: payload_size == 0 */
 369struct drbd_peer_request *
 370drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
 371                    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
 372{
 373        struct drbd_device *device = peer_device->device;
 374        struct drbd_peer_request *peer_req;
 375        struct page *page = NULL;
 376        unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 377
 378        if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
 379                return NULL;
 380
 381        peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 382        if (!peer_req) {
 383                if (!(gfp_mask & __GFP_NOWARN))
 384                        drbd_err(device, "%s: allocation failed\n", __func__);
 385                return NULL;
 386        }
 387
 388        if (nr_pages) {
 389                page = drbd_alloc_pages(peer_device, nr_pages,
 390                                        gfpflags_allow_blocking(gfp_mask));
 391                if (!page)
 392                        goto fail;
 393        }
 394
 395        memset(peer_req, 0, sizeof(*peer_req));
 396        INIT_LIST_HEAD(&peer_req->w.list);
 397        drbd_clear_interval(&peer_req->i);
 398        peer_req->i.size = request_size;
 399        peer_req->i.sector = sector;
 400        peer_req->submit_jif = jiffies;
 401        peer_req->peer_device = peer_device;
 402        peer_req->pages = page;
 403        /*
 404         * The block_id is opaque to the receiver.  It is not endianness
 405         * converted, and sent back to the sender unchanged.
 406         */
 407        peer_req->block_id = id;
 408
 409        return peer_req;
 410
 411 fail:
 412        mempool_free(peer_req, &drbd_ee_mempool);
 413        return NULL;
 414}
 415
 416void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
 417                       int is_net)
 418{
 419        might_sleep();
 420        if (peer_req->flags & EE_HAS_DIGEST)
 421                kfree(peer_req->digest);
 422        drbd_free_pages(device, peer_req->pages, is_net);
 423        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
 424        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
 425        if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
 426                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 427                drbd_al_complete_io(device, &peer_req->i);
 428        }
 429        mempool_free(peer_req, &drbd_ee_mempool);
 430}
 431
 432int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
 433{
 434        LIST_HEAD(work_list);
 435        struct drbd_peer_request *peer_req, *t;
 436        int count = 0;
 437        int is_net = list == &device->net_ee;
 438
 439        spin_lock_irq(&device->resource->req_lock);
 440        list_splice_init(list, &work_list);
 441        spin_unlock_irq(&device->resource->req_lock);
 442
 443        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 444                __drbd_free_peer_req(device, peer_req, is_net);
 445                count++;
 446        }
 447        return count;
 448}
 449
 450/*
 451 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 452 */
 453static int drbd_finish_peer_reqs(struct drbd_device *device)
 454{
 455        LIST_HEAD(work_list);
 456        LIST_HEAD(reclaimed);
 457        struct drbd_peer_request *peer_req, *t;
 458        int err = 0;
 459
 460        spin_lock_irq(&device->resource->req_lock);
 461        reclaim_finished_net_peer_reqs(device, &reclaimed);
 462        list_splice_init(&device->done_ee, &work_list);
 463        spin_unlock_irq(&device->resource->req_lock);
 464
 465        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 466                drbd_free_net_peer_req(device, peer_req);
 467
 468        /* possible callbacks here:
 469         * e_end_block, and e_end_resync_block, e_send_superseded.
 470         * all ignore the last argument.
 471         */
 472        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 473                int err2;
 474
 475                /* list_del not necessary, next/prev members not touched */
 476                err2 = peer_req->w.cb(&peer_req->w, !!err);
 477                if (!err)
 478                        err = err2;
 479                drbd_free_peer_req(device, peer_req);
 480        }
 481        wake_up(&device->ee_wait);
 482
 483        return err;
 484}
 485
 486static void _drbd_wait_ee_list_empty(struct drbd_device *device,
 487                                     struct list_head *head)
 488{
 489        DEFINE_WAIT(wait);
 490
 491        /* avoids spin_lock/unlock
 492         * and calling prepare_to_wait in the fast path */
 493        while (!list_empty(head)) {
 494                prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 495                spin_unlock_irq(&device->resource->req_lock);
 496                io_schedule();
 497                finish_wait(&device->ee_wait, &wait);
 498                spin_lock_irq(&device->resource->req_lock);
 499        }
 500}
 501
 502static void drbd_wait_ee_list_empty(struct drbd_device *device,
 503                                    struct list_head *head)
 504{
 505        spin_lock_irq(&device->resource->req_lock);
 506        _drbd_wait_ee_list_empty(device, head);
 507        spin_unlock_irq(&device->resource->req_lock);
 508}
 509
 510static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 511{
 512        struct kvec iov = {
 513                .iov_base = buf,
 514                .iov_len = size,
 515        };
 516        struct msghdr msg = {
 517                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 518        };
 519        iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, size);
 520        return sock_recvmsg(sock, &msg, msg.msg_flags);
 521}
 522
 523static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
 524{
 525        int rv;
 526
 527        rv = drbd_recv_short(connection->data.socket, buf, size, 0);
 528
 529        if (rv < 0) {
 530                if (rv == -ECONNRESET)
 531                        drbd_info(connection, "sock was reset by peer\n");
 532                else if (rv != -ERESTARTSYS)
 533                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
 534        } else if (rv == 0) {
 535                if (test_bit(DISCONNECT_SENT, &connection->flags)) {
 536                        long t;
 537                        rcu_read_lock();
 538                        t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
 539                        rcu_read_unlock();
 540
 541                        t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
 542
 543                        if (t)
 544                                goto out;
 545                }
 546                drbd_info(connection, "sock was shut down by peer\n");
 547        }
 548
 549        if (rv != size)
 550                conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
 551
 552out:
 553        return rv;
 554}
 555
 556static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
 557{
 558        int err;
 559
 560        err = drbd_recv(connection, buf, size);
 561        if (err != size) {
 562                if (err >= 0)
 563                        err = -EIO;
 564        } else
 565                err = 0;
 566        return err;
 567}
 568
 569static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
 570{
 571        int err;
 572
 573        err = drbd_recv_all(connection, buf, size);
 574        if (err && !signal_pending(current))
 575                drbd_warn(connection, "short read (expected size %d)\n", (int)size);
 576        return err;
 577}
 578
 579/* quoting tcp(7):
 580 *   On individual connections, the socket buffer size must be set prior to the
 581 *   listen(2) or connect(2) calls in order to have it take effect.
 582 * This is our wrapper to do so.
 583 */
 584static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 585                unsigned int rcv)
 586{
 587        /* open coded SO_SNDBUF, SO_RCVBUF */
 588        if (snd) {
 589                sock->sk->sk_sndbuf = snd;
 590                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 591        }
 592        if (rcv) {
 593                sock->sk->sk_rcvbuf = rcv;
 594                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 595        }
 596}
 597
 598static struct socket *drbd_try_connect(struct drbd_connection *connection)
 599{
 600        const char *what;
 601        struct socket *sock;
 602        struct sockaddr_in6 src_in6;
 603        struct sockaddr_in6 peer_in6;
 604        struct net_conf *nc;
 605        int err, peer_addr_len, my_addr_len;
 606        int sndbuf_size, rcvbuf_size, connect_int;
 607        int disconnect_on_error = 1;
 608
 609        rcu_read_lock();
 610        nc = rcu_dereference(connection->net_conf);
 611        if (!nc) {
 612                rcu_read_unlock();
 613                return NULL;
 614        }
 615        sndbuf_size = nc->sndbuf_size;
 616        rcvbuf_size = nc->rcvbuf_size;
 617        connect_int = nc->connect_int;
 618        rcu_read_unlock();
 619
 620        my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
 621        memcpy(&src_in6, &connection->my_addr, my_addr_len);
 622
 623        if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
 624                src_in6.sin6_port = 0;
 625        else
 626                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 627
 628        peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
 629        memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
 630
 631        what = "sock_create_kern";
 632        err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
 633                               SOCK_STREAM, IPPROTO_TCP, &sock);
 634        if (err < 0) {
 635                sock = NULL;
 636                goto out;
 637        }
 638
 639        sock->sk->sk_rcvtimeo =
 640        sock->sk->sk_sndtimeo = connect_int * HZ;
 641        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 642
 643       /* explicitly bind to the configured IP as source IP
 644        *  for the outgoing connections.
 645        *  This is needed for multihomed hosts and to be
 646        *  able to use lo: interfaces for drbd.
 647        * Make sure to use 0 as port number, so linux selects
 648        *  a free one dynamically.
 649        */
 650        what = "bind before connect";
 651        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 652        if (err < 0)
 653                goto out;
 654
 655        /* connect may fail, peer not yet available.
 656         * stay C_WF_CONNECTION, don't go Disconnecting! */
 657        disconnect_on_error = 0;
 658        what = "connect";
 659        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 660
 661out:
 662        if (err < 0) {
 663                if (sock) {
 664                        sock_release(sock);
 665                        sock = NULL;
 666                }
 667                switch (-err) {
 668                        /* timeout, busy, signal pending */
 669                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 670                case EINTR: case ERESTARTSYS:
 671                        /* peer not (yet) available, network problem */
 672                case ECONNREFUSED: case ENETUNREACH:
 673                case EHOSTDOWN:    case EHOSTUNREACH:
 674                        disconnect_on_error = 0;
 675                        break;
 676                default:
 677                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 678                }
 679                if (disconnect_on_error)
 680                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 681        }
 682
 683        return sock;
 684}
 685
 686struct accept_wait_data {
 687        struct drbd_connection *connection;
 688        struct socket *s_listen;
 689        struct completion door_bell;
 690        void (*original_sk_state_change)(struct sock *sk);
 691
 692};
 693
 694static void drbd_incoming_connection(struct sock *sk)
 695{
 696        struct accept_wait_data *ad = sk->sk_user_data;
 697        void (*state_change)(struct sock *sk);
 698
 699        state_change = ad->original_sk_state_change;
 700        if (sk->sk_state == TCP_ESTABLISHED)
 701                complete(&ad->door_bell);
 702        state_change(sk);
 703}
 704
 705static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
 706{
 707        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 708        struct sockaddr_in6 my_addr;
 709        struct socket *s_listen;
 710        struct net_conf *nc;
 711        const char *what;
 712
 713        rcu_read_lock();
 714        nc = rcu_dereference(connection->net_conf);
 715        if (!nc) {
 716                rcu_read_unlock();
 717                return -EIO;
 718        }
 719        sndbuf_size = nc->sndbuf_size;
 720        rcvbuf_size = nc->rcvbuf_size;
 721        rcu_read_unlock();
 722
 723        my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
 724        memcpy(&my_addr, &connection->my_addr, my_addr_len);
 725
 726        what = "sock_create_kern";
 727        err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
 728                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 729        if (err) {
 730                s_listen = NULL;
 731                goto out;
 732        }
 733
 734        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 735        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 736
 737        what = "bind before listen";
 738        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 739        if (err < 0)
 740                goto out;
 741
 742        ad->s_listen = s_listen;
 743        write_lock_bh(&s_listen->sk->sk_callback_lock);
 744        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 745        s_listen->sk->sk_state_change = drbd_incoming_connection;
 746        s_listen->sk->sk_user_data = ad;
 747        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 748
 749        what = "listen";
 750        err = s_listen->ops->listen(s_listen, 5);
 751        if (err < 0)
 752                goto out;
 753
 754        return 0;
 755out:
 756        if (s_listen)
 757                sock_release(s_listen);
 758        if (err < 0) {
 759                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 760                        drbd_err(connection, "%s failed, err = %d\n", what, err);
 761                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 762                }
 763        }
 764
 765        return -EIO;
 766}
 767
 768static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 769{
 770        write_lock_bh(&sk->sk_callback_lock);
 771        sk->sk_state_change = ad->original_sk_state_change;
 772        sk->sk_user_data = NULL;
 773        write_unlock_bh(&sk->sk_callback_lock);
 774}
 775
 776static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
 777{
 778        int timeo, connect_int, err = 0;
 779        struct socket *s_estab = NULL;
 780        struct net_conf *nc;
 781
 782        rcu_read_lock();
 783        nc = rcu_dereference(connection->net_conf);
 784        if (!nc) {
 785                rcu_read_unlock();
 786                return NULL;
 787        }
 788        connect_int = nc->connect_int;
 789        rcu_read_unlock();
 790
 791        timeo = connect_int * HZ;
 792        /* 28.5% random jitter */
 793        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 794
 795        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 796        if (err <= 0)
 797                return NULL;
 798
 799        err = kernel_accept(ad->s_listen, &s_estab, 0);
 800        if (err < 0) {
 801                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 802                        drbd_err(connection, "accept failed, err = %d\n", err);
 803                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
 804                }
 805        }
 806
 807        if (s_estab)
 808                unregister_state_change(s_estab->sk, ad);
 809
 810        return s_estab;
 811}
 812
 813static int decode_header(struct drbd_connection *, void *, struct packet_info *);
 814
 815static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
 816                             enum drbd_packet cmd)
 817{
 818        if (!conn_prepare_command(connection, sock))
 819                return -EIO;
 820        return conn_send_command(connection, sock, cmd, 0, NULL, 0);
 821}
 822
 823static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
 824{
 825        unsigned int header_size = drbd_header_size(connection);
 826        struct packet_info pi;
 827        struct net_conf *nc;
 828        int err;
 829
 830        rcu_read_lock();
 831        nc = rcu_dereference(connection->net_conf);
 832        if (!nc) {
 833                rcu_read_unlock();
 834                return -EIO;
 835        }
 836        sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
 837        rcu_read_unlock();
 838
 839        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
 840        if (err != header_size) {
 841                if (err >= 0)
 842                        err = -EIO;
 843                return err;
 844        }
 845        err = decode_header(connection, connection->data.rbuf, &pi);
 846        if (err)
 847                return err;
 848        return pi.cmd;
 849}
 850
 851/**
 852 * drbd_socket_okay() - Free the socket if its connection is not okay
 853 * @sock:       pointer to the pointer to the socket.
 854 */
 855static bool drbd_socket_okay(struct socket **sock)
 856{
 857        int rr;
 858        char tb[4];
 859
 860        if (!*sock)
 861                return false;
 862
 863        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 864
 865        if (rr > 0 || rr == -EAGAIN) {
 866                return true;
 867        } else {
 868                sock_release(*sock);
 869                *sock = NULL;
 870                return false;
 871        }
 872}
 873
 874static bool connection_established(struct drbd_connection *connection,
 875                                   struct socket **sock1,
 876                                   struct socket **sock2)
 877{
 878        struct net_conf *nc;
 879        int timeout;
 880        bool ok;
 881
 882        if (!*sock1 || !*sock2)
 883                return false;
 884
 885        rcu_read_lock();
 886        nc = rcu_dereference(connection->net_conf);
 887        timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
 888        rcu_read_unlock();
 889        schedule_timeout_interruptible(timeout);
 890
 891        ok = drbd_socket_okay(sock1);
 892        ok = drbd_socket_okay(sock2) && ok;
 893
 894        return ok;
 895}
 896
 897/* Gets called if a connection is established, or if a new minor gets created
 898   in a connection */
 899int drbd_connected(struct drbd_peer_device *peer_device)
 900{
 901        struct drbd_device *device = peer_device->device;
 902        int err;
 903
 904        atomic_set(&device->packet_seq, 0);
 905        device->peer_seq = 0;
 906
 907        device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
 908                &peer_device->connection->cstate_mutex :
 909                &device->own_state_mutex;
 910
 911        err = drbd_send_sync_param(peer_device);
 912        if (!err)
 913                err = drbd_send_sizes(peer_device, 0, 0);
 914        if (!err)
 915                err = drbd_send_uuids(peer_device);
 916        if (!err)
 917                err = drbd_send_current_state(peer_device);
 918        clear_bit(USE_DEGR_WFC_T, &device->flags);
 919        clear_bit(RESIZE_PENDING, &device->flags);
 920        atomic_set(&device->ap_in_flight, 0);
 921        mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
 922        return err;
 923}
 924
 925/*
 926 * return values:
 927 *   1 yes, we have a valid connection
 928 *   0 oops, did not work out, please try again
 929 *  -1 peer talks different language,
 930 *     no point in trying again, please go standalone.
 931 *  -2 We do not have a network config...
 932 */
 933static int conn_connect(struct drbd_connection *connection)
 934{
 935        struct drbd_socket sock, msock;
 936        struct drbd_peer_device *peer_device;
 937        struct net_conf *nc;
 938        int vnr, timeout, h;
 939        bool discard_my_data, ok;
 940        enum drbd_state_rv rv;
 941        struct accept_wait_data ad = {
 942                .connection = connection,
 943                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 944        };
 945
 946        clear_bit(DISCONNECT_SENT, &connection->flags);
 947        if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 948                return -2;
 949
 950        mutex_init(&sock.mutex);
 951        sock.sbuf = connection->data.sbuf;
 952        sock.rbuf = connection->data.rbuf;
 953        sock.socket = NULL;
 954        mutex_init(&msock.mutex);
 955        msock.sbuf = connection->meta.sbuf;
 956        msock.rbuf = connection->meta.rbuf;
 957        msock.socket = NULL;
 958
 959        /* Assume that the peer only understands protocol 80 until we know better.  */
 960        connection->agreed_pro_version = 80;
 961
 962        if (prepare_listen_socket(connection, &ad))
 963                return 0;
 964
 965        do {
 966                struct socket *s;
 967
 968                s = drbd_try_connect(connection);
 969                if (s) {
 970                        if (!sock.socket) {
 971                                sock.socket = s;
 972                                send_first_packet(connection, &sock, P_INITIAL_DATA);
 973                        } else if (!msock.socket) {
 974                                clear_bit(RESOLVE_CONFLICTS, &connection->flags);
 975                                msock.socket = s;
 976                                send_first_packet(connection, &msock, P_INITIAL_META);
 977                        } else {
 978                                drbd_err(connection, "Logic error in conn_connect()\n");
 979                                goto out_release_sockets;
 980                        }
 981                }
 982
 983                if (connection_established(connection, &sock.socket, &msock.socket))
 984                        break;
 985
 986retry:
 987                s = drbd_wait_for_connect(connection, &ad);
 988                if (s) {
 989                        int fp = receive_first_packet(connection, s);
 990                        drbd_socket_okay(&sock.socket);
 991                        drbd_socket_okay(&msock.socket);
 992                        switch (fp) {
 993                        case P_INITIAL_DATA:
 994                                if (sock.socket) {
 995                                        drbd_warn(connection, "initial packet S crossed\n");
 996                                        sock_release(sock.socket);
 997                                        sock.socket = s;
 998                                        goto randomize;
 999                                }
1000                                sock.socket = s;
1001                                break;
1002                        case P_INITIAL_META:
1003                                set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004                                if (msock.socket) {
1005                                        drbd_warn(connection, "initial packet M crossed\n");
1006                                        sock_release(msock.socket);
1007                                        msock.socket = s;
1008                                        goto randomize;
1009                                }
1010                                msock.socket = s;
1011                                break;
1012                        default:
1013                                drbd_warn(connection, "Error receiving initial packet\n");
1014                                sock_release(s);
1015randomize:
1016                                if (prandom_u32() & 1)
1017                                        goto retry;
1018                        }
1019                }
1020
1021                if (connection->cstate <= C_DISCONNECTING)
1022                        goto out_release_sockets;
1023                if (signal_pending(current)) {
1024                        flush_signals(current);
1025                        smp_rmb();
1026                        if (get_t_state(&connection->receiver) == EXITING)
1027                                goto out_release_sockets;
1028                }
1029
1030                ok = connection_established(connection, &sock.socket, &msock.socket);
1031        } while (!ok);
1032
1033        if (ad.s_listen)
1034                sock_release(ad.s_listen);
1035
1036        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038
1039        sock.socket->sk->sk_allocation = GFP_NOIO;
1040        msock.socket->sk->sk_allocation = GFP_NOIO;
1041
1042        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044
1045        /* NOT YET ...
1046         * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048         * first set it to the P_CONNECTION_FEATURES timeout,
1049         * which we set to 4x the configured ping_timeout. */
1050        rcu_read_lock();
1051        nc = rcu_dereference(connection->net_conf);
1052
1053        sock.socket->sk->sk_sndtimeo =
1054        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055
1056        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057        timeout = nc->timeout * HZ / 10;
1058        discard_my_data = nc->discard_my_data;
1059        rcu_read_unlock();
1060
1061        msock.socket->sk->sk_sndtimeo = timeout;
1062
1063        /* we don't want delays.
1064         * we use TCP_CORK where appropriate, though */
1065        drbd_tcp_nodelay(sock.socket);
1066        drbd_tcp_nodelay(msock.socket);
1067
1068        connection->data.socket = sock.socket;
1069        connection->meta.socket = msock.socket;
1070        connection->last_received = jiffies;
1071
1072        h = drbd_do_features(connection);
1073        if (h <= 0)
1074                return h;
1075
1076        if (connection->cram_hmac_tfm) {
1077                /* drbd_request_state(device, NS(conn, WFAuth)); */
1078                switch (drbd_do_auth(connection)) {
1079                case -1:
1080                        drbd_err(connection, "Authentication of peer failed\n");
1081                        return -1;
1082                case 0:
1083                        drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084                        return 0;
1085                }
1086        }
1087
1088        connection->data.socket->sk->sk_sndtimeo = timeout;
1089        connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090
1091        if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092                return -1;
1093
1094        /* Prevent a race between resync-handshake and
1095         * being promoted to Primary.
1096         *
1097         * Grab and release the state mutex, so we know that any current
1098         * drbd_set_role() is finished, and any incoming drbd_set_role
1099         * will see the STATE_SENT flag, and wait for it to be cleared.
1100         */
1101        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102                mutex_lock(peer_device->device->state_mutex);
1103
1104        /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105        spin_lock_irq(&connection->resource->req_lock);
1106        set_bit(STATE_SENT, &connection->flags);
1107        spin_unlock_irq(&connection->resource->req_lock);
1108
1109        idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110                mutex_unlock(peer_device->device->state_mutex);
1111
1112        rcu_read_lock();
1113        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114                struct drbd_device *device = peer_device->device;
1115                kref_get(&device->kref);
1116                rcu_read_unlock();
1117
1118                if (discard_my_data)
1119                        set_bit(DISCARD_MY_DATA, &device->flags);
1120                else
1121                        clear_bit(DISCARD_MY_DATA, &device->flags);
1122
1123                drbd_connected(peer_device);
1124                kref_put(&device->kref, drbd_destroy_device);
1125                rcu_read_lock();
1126        }
1127        rcu_read_unlock();
1128
1129        rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130        if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131                clear_bit(STATE_SENT, &connection->flags);
1132                return 0;
1133        }
1134
1135        drbd_thread_start(&connection->ack_receiver);
1136        /* opencoded create_singlethread_workqueue(),
1137         * to be able to use format string arguments */
1138        connection->ack_sender =
1139                alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140        if (!connection->ack_sender) {
1141                drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142                return 0;
1143        }
1144
1145        mutex_lock(&connection->resource->conf_update);
1146        /* The discard_my_data flag is a single-shot modifier to the next
1147         * connection attempt, the handshake of which is now well underway.
1148         * No need for rcu style copying of the whole struct
1149         * just to clear a single value. */
1150        connection->net_conf->discard_my_data = 0;
1151        mutex_unlock(&connection->resource->conf_update);
1152
1153        return h;
1154
1155out_release_sockets:
1156        if (ad.s_listen)
1157                sock_release(ad.s_listen);
1158        if (sock.socket)
1159                sock_release(sock.socket);
1160        if (msock.socket)
1161                sock_release(msock.socket);
1162        return -1;
1163}
1164
1165static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1166{
1167        unsigned int header_size = drbd_header_size(connection);
1168
1169        if (header_size == sizeof(struct p_header100) &&
1170            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171                struct p_header100 *h = header;
1172                if (h->pad != 0) {
1173                        drbd_err(connection, "Header padding is not zero\n");
1174                        return -EINVAL;
1175                }
1176                pi->vnr = be16_to_cpu(h->volume);
1177                pi->cmd = be16_to_cpu(h->command);
1178                pi->size = be32_to_cpu(h->length);
1179        } else if (header_size == sizeof(struct p_header95) &&
1180                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181                struct p_header95 *h = header;
1182                pi->cmd = be16_to_cpu(h->command);
1183                pi->size = be32_to_cpu(h->length);
1184                pi->vnr = 0;
1185        } else if (header_size == sizeof(struct p_header80) &&
1186                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187                struct p_header80 *h = header;
1188                pi->cmd = be16_to_cpu(h->command);
1189                pi->size = be16_to_cpu(h->length);
1190                pi->vnr = 0;
1191        } else {
1192                drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193                         be32_to_cpu(*(__be32 *)header),
1194                         connection->agreed_pro_version);
1195                return -EINVAL;
1196        }
1197        pi->data = header + header_size;
1198        return 0;
1199}
1200
1201static void drbd_unplug_all_devices(struct drbd_connection *connection)
1202{
1203        if (current->plug == &connection->receiver_plug) {
1204                blk_finish_plug(&connection->receiver_plug);
1205                blk_start_plug(&connection->receiver_plug);
1206        } /* else: maybe just schedule() ?? */
1207}
1208
1209static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1210{
1211        void *buffer = connection->data.rbuf;
1212        int err;
1213
1214        err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215        if (err)
1216                return err;
1217
1218        err = decode_header(connection, buffer, pi);
1219        connection->last_received = jiffies;
1220
1221        return err;
1222}
1223
1224static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1225{
1226        void *buffer = connection->data.rbuf;
1227        unsigned int size = drbd_header_size(connection);
1228        int err;
1229
1230        err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231        if (err != size) {
1232                /* If we have nothing in the receive buffer now, to reduce
1233                 * application latency, try to drain the backend queues as
1234                 * quickly as possible, and let remote TCP know what we have
1235                 * received so far. */
1236                if (err == -EAGAIN) {
1237                        drbd_tcp_quickack(connection->data.socket);
1238                        drbd_unplug_all_devices(connection);
1239                }
1240                if (err > 0) {
1241                        buffer += err;
1242                        size -= err;
1243                }
1244                err = drbd_recv_all_warn(connection, buffer, size);
1245                if (err)
1246                        return err;
1247        }
1248
1249        err = decode_header(connection, connection->data.rbuf, pi);
1250        connection->last_received = jiffies;
1251
1252        return err;
1253}
1254/* This is blkdev_issue_flush, but asynchronous.
1255 * We want to submit to all component volumes in parallel,
1256 * then wait for all completions.
1257 */
1258struct issue_flush_context {
1259        atomic_t pending;
1260        int error;
1261        struct completion done;
1262};
1263struct one_flush_context {
1264        struct drbd_device *device;
1265        struct issue_flush_context *ctx;
1266};
1267
1268static void one_flush_endio(struct bio *bio)
1269{
1270        struct one_flush_context *octx = bio->bi_private;
1271        struct drbd_device *device = octx->device;
1272        struct issue_flush_context *ctx = octx->ctx;
1273
1274        if (bio->bi_status) {
1275                ctx->error = blk_status_to_errno(bio->bi_status);
1276                drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1277        }
1278        kfree(octx);
1279        bio_put(bio);
1280
1281        clear_bit(FLUSH_PENDING, &device->flags);
1282        put_ldev(device);
1283        kref_put(&device->kref, drbd_destroy_device);
1284
1285        if (atomic_dec_and_test(&ctx->pending))
1286                complete(&ctx->done);
1287}
1288
1289static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1290{
1291        struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292        struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293        if (!bio || !octx) {
1294                drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295                /* FIXME: what else can I do now?  disconnecting or detaching
1296                 * really does not help to improve the state of the world, either.
1297                 */
1298                kfree(octx);
1299                if (bio)
1300                        bio_put(bio);
1301
1302                ctx->error = -ENOMEM;
1303                put_ldev(device);
1304                kref_put(&device->kref, drbd_destroy_device);
1305                return;
1306        }
1307
1308        octx->device = device;
1309        octx->ctx = ctx;
1310        bio_set_dev(bio, device->ldev->backing_bdev);
1311        bio->bi_private = octx;
1312        bio->bi_end_io = one_flush_endio;
1313        bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1314
1315        device->flush_jif = jiffies;
1316        set_bit(FLUSH_PENDING, &device->flags);
1317        atomic_inc(&ctx->pending);
1318        submit_bio(bio);
1319}
1320
1321static void drbd_flush(struct drbd_connection *connection)
1322{
1323        if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324                struct drbd_peer_device *peer_device;
1325                struct issue_flush_context ctx;
1326                int vnr;
1327
1328                atomic_set(&ctx.pending, 1);
1329                ctx.error = 0;
1330                init_completion(&ctx.done);
1331
1332                rcu_read_lock();
1333                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334                        struct drbd_device *device = peer_device->device;
1335
1336                        if (!get_ldev(device))
1337                                continue;
1338                        kref_get(&device->kref);
1339                        rcu_read_unlock();
1340
1341                        submit_one_flush(device, &ctx);
1342
1343                        rcu_read_lock();
1344                }
1345                rcu_read_unlock();
1346
1347                /* Do we want to add a timeout,
1348                 * if disk-timeout is set? */
1349                if (!atomic_dec_and_test(&ctx.pending))
1350                        wait_for_completion(&ctx.done);
1351
1352                if (ctx.error) {
1353                        /* would rather check on EOPNOTSUPP, but that is not reliable.
1354                         * don't try again for ANY return value != 0
1355                         * if (rv == -EOPNOTSUPP) */
1356                        /* Any error is already reported by bio_endio callback. */
1357                        drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1358                }
1359        }
1360}
1361
1362/**
1363 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364 * @device:     DRBD device.
1365 * @epoch:      Epoch object.
1366 * @ev:         Epoch event.
1367 */
1368static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369                                               struct drbd_epoch *epoch,
1370                                               enum epoch_event ev)
1371{
1372        int epoch_size;
1373        struct drbd_epoch *next_epoch;
1374        enum finish_epoch rv = FE_STILL_LIVE;
1375
1376        spin_lock(&connection->epoch_lock);
1377        do {
1378                next_epoch = NULL;
1379
1380                epoch_size = atomic_read(&epoch->epoch_size);
1381
1382                switch (ev & ~EV_CLEANUP) {
1383                case EV_PUT:
1384                        atomic_dec(&epoch->active);
1385                        break;
1386                case EV_GOT_BARRIER_NR:
1387                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388                        break;
1389                case EV_BECAME_LAST:
1390                        /* nothing to do*/
1391                        break;
1392                }
1393
1394                if (epoch_size != 0 &&
1395                    atomic_read(&epoch->active) == 0 &&
1396                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397                        if (!(ev & EV_CLEANUP)) {
1398                                spin_unlock(&connection->epoch_lock);
1399                                drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400                                spin_lock(&connection->epoch_lock);
1401                        }
1402#if 0
1403                        /* FIXME: dec unacked on connection, once we have
1404                         * something to count pending connection packets in. */
1405                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406                                dec_unacked(epoch->connection);
1407#endif
1408
1409                        if (connection->current_epoch != epoch) {
1410                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411                                list_del(&epoch->list);
1412                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413                                connection->epochs--;
1414                                kfree(epoch);
1415
1416                                if (rv == FE_STILL_LIVE)
1417                                        rv = FE_DESTROYED;
1418                        } else {
1419                                epoch->flags = 0;
1420                                atomic_set(&epoch->epoch_size, 0);
1421                                /* atomic_set(&epoch->active, 0); is already zero */
1422                                if (rv == FE_STILL_LIVE)
1423                                        rv = FE_RECYCLED;
1424                        }
1425                }
1426
1427                if (!next_epoch)
1428                        break;
1429
1430                epoch = next_epoch;
1431        } while (1);
1432
1433        spin_unlock(&connection->epoch_lock);
1434
1435        return rv;
1436}
1437
1438static enum write_ordering_e
1439max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1440{
1441        struct disk_conf *dc;
1442
1443        dc = rcu_dereference(bdev->disk_conf);
1444
1445        if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446                wo = WO_DRAIN_IO;
1447        if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448                wo = WO_NONE;
1449
1450        return wo;
1451}
1452
1453/**
1454 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455 * @connection: DRBD connection.
1456 * @wo:         Write ordering method to try.
1457 */
1458void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459                              enum write_ordering_e wo)
1460{
1461        struct drbd_device *device;
1462        enum write_ordering_e pwo;
1463        int vnr;
1464        static char *write_ordering_str[] = {
1465                [WO_NONE] = "none",
1466                [WO_DRAIN_IO] = "drain",
1467                [WO_BDEV_FLUSH] = "flush",
1468        };
1469
1470        pwo = resource->write_ordering;
1471        if (wo != WO_BDEV_FLUSH)
1472                wo = min(pwo, wo);
1473        rcu_read_lock();
1474        idr_for_each_entry(&resource->devices, device, vnr) {
1475                if (get_ldev(device)) {
1476                        wo = max_allowed_wo(device->ldev, wo);
1477                        if (device->ldev == bdev)
1478                                bdev = NULL;
1479                        put_ldev(device);
1480                }
1481        }
1482
1483        if (bdev)
1484                wo = max_allowed_wo(bdev, wo);
1485
1486        rcu_read_unlock();
1487
1488        resource->write_ordering = wo;
1489        if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490                drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1491}
1492
1493static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1494{
1495        struct block_device *bdev = device->ldev->backing_bdev;
1496
1497        if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1498                        GFP_NOIO, 0))
1499                peer_req->flags |= EE_WAS_ERROR;
1500
1501        drbd_endio_write_sec_final(peer_req);
1502}
1503
1504static void drbd_issue_peer_wsame(struct drbd_device *device,
1505                                  struct drbd_peer_request *peer_req)
1506{
1507        struct block_device *bdev = device->ldev->backing_bdev;
1508        sector_t s = peer_req->i.sector;
1509        sector_t nr = peer_req->i.size >> 9;
1510        if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1511                peer_req->flags |= EE_WAS_ERROR;
1512        drbd_endio_write_sec_final(peer_req);
1513}
1514
1515
1516/**
1517 * drbd_submit_peer_request()
1518 * @device:     DRBD device.
1519 * @peer_req:   peer request
1520 * @rw:         flag field, see bio->bi_opf
1521 *
1522 * May spread the pages to multiple bios,
1523 * depending on bio_add_page restrictions.
1524 *
1525 * Returns 0 if all bios have been submitted,
1526 * -ENOMEM if we could not allocate enough bios,
1527 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1528 *  single page to an empty bio (which should never happen and likely indicates
1529 *  that the lower level IO stack is in some way broken). This has been observed
1530 *  on certain Xen deployments.
1531 */
1532/* TODO allocate from our own bio_set. */
1533int drbd_submit_peer_request(struct drbd_device *device,
1534                             struct drbd_peer_request *peer_req,
1535                             const unsigned op, const unsigned op_flags,
1536                             const int fault_type)
1537{
1538        struct bio *bios = NULL;
1539        struct bio *bio;
1540        struct page *page = peer_req->pages;
1541        sector_t sector = peer_req->i.sector;
1542        unsigned data_size = peer_req->i.size;
1543        unsigned n_bios = 0;
1544        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1545        int err = -ENOMEM;
1546
1547        /* TRIM/DISCARD: for now, always use the helper function
1548         * blkdev_issue_zeroout(..., discard=true).
1549         * It's synchronous, but it does the right thing wrt. bio splitting.
1550         * Correctness first, performance later.  Next step is to code an
1551         * asynchronous variant of the same.
1552         */
1553        if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1554                /* wait for all pending IO completions, before we start
1555                 * zeroing things out. */
1556                conn_wait_active_ee_empty(peer_req->peer_device->connection);
1557                /* add it to the active list now,
1558                 * so we can find it to present it in debugfs */
1559                peer_req->submit_jif = jiffies;
1560                peer_req->flags |= EE_SUBMITTED;
1561
1562                /* If this was a resync request from receive_rs_deallocated(),
1563                 * it is already on the sync_ee list */
1564                if (list_empty(&peer_req->w.list)) {
1565                        spin_lock_irq(&device->resource->req_lock);
1566                        list_add_tail(&peer_req->w.list, &device->active_ee);
1567                        spin_unlock_irq(&device->resource->req_lock);
1568                }
1569
1570                if (peer_req->flags & EE_IS_TRIM)
1571                        drbd_issue_peer_discard(device, peer_req);
1572                else /* EE_WRITE_SAME */
1573                        drbd_issue_peer_wsame(device, peer_req);
1574                return 0;
1575        }
1576
1577        /* In most cases, we will only need one bio.  But in case the lower
1578         * level restrictions happen to be different at this offset on this
1579         * side than those of the sending peer, we may need to submit the
1580         * request in more than one bio.
1581         *
1582         * Plain bio_alloc is good enough here, this is no DRBD internally
1583         * generated bio, but a bio allocated on behalf of the peer.
1584         */
1585next_bio:
1586        bio = bio_alloc(GFP_NOIO, nr_pages);
1587        if (!bio) {
1588                drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1589                goto fail;
1590        }
1591        /* > peer_req->i.sector, unless this is the first bio */
1592        bio->bi_iter.bi_sector = sector;
1593        bio_set_dev(bio, device->ldev->backing_bdev);
1594        bio_set_op_attrs(bio, op, op_flags);
1595        bio->bi_private = peer_req;
1596        bio->bi_end_io = drbd_peer_request_endio;
1597
1598        bio->bi_next = bios;
1599        bios = bio;
1600        ++n_bios;
1601
1602        page_chain_for_each(page) {
1603                unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1604                if (!bio_add_page(bio, page, len, 0))
1605                        goto next_bio;
1606                data_size -= len;
1607                sector += len >> 9;
1608                --nr_pages;
1609        }
1610        D_ASSERT(device, data_size == 0);
1611        D_ASSERT(device, page == NULL);
1612
1613        atomic_set(&peer_req->pending_bios, n_bios);
1614        /* for debugfs: update timestamp, mark as submitted */
1615        peer_req->submit_jif = jiffies;
1616        peer_req->flags |= EE_SUBMITTED;
1617        do {
1618                bio = bios;
1619                bios = bios->bi_next;
1620                bio->bi_next = NULL;
1621
1622                drbd_generic_make_request(device, fault_type, bio);
1623        } while (bios);
1624        return 0;
1625
1626fail:
1627        while (bios) {
1628                bio = bios;
1629                bios = bios->bi_next;
1630                bio_put(bio);
1631        }
1632        return err;
1633}
1634
1635static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1636                                             struct drbd_peer_request *peer_req)
1637{
1638        struct drbd_interval *i = &peer_req->i;
1639
1640        drbd_remove_interval(&device->write_requests, i);
1641        drbd_clear_interval(i);
1642
1643        /* Wake up any processes waiting for this peer request to complete.  */
1644        if (i->waiting)
1645                wake_up(&device->misc_wait);
1646}
1647
1648static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1649{
1650        struct drbd_peer_device *peer_device;
1651        int vnr;
1652
1653        rcu_read_lock();
1654        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1655                struct drbd_device *device = peer_device->device;
1656
1657                kref_get(&device->kref);
1658                rcu_read_unlock();
1659                drbd_wait_ee_list_empty(device, &device->active_ee);
1660                kref_put(&device->kref, drbd_destroy_device);
1661                rcu_read_lock();
1662        }
1663        rcu_read_unlock();
1664}
1665
1666static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1667{
1668        int rv;
1669        struct p_barrier *p = pi->data;
1670        struct drbd_epoch *epoch;
1671
1672        /* FIXME these are unacked on connection,
1673         * not a specific (peer)device.
1674         */
1675        connection->current_epoch->barrier_nr = p->barrier;
1676        connection->current_epoch->connection = connection;
1677        rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1678
1679        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1680         * the activity log, which means it would not be resynced in case the
1681         * R_PRIMARY crashes now.
1682         * Therefore we must send the barrier_ack after the barrier request was
1683         * completed. */
1684        switch (connection->resource->write_ordering) {
1685        case WO_NONE:
1686                if (rv == FE_RECYCLED)
1687                        return 0;
1688
1689                /* receiver context, in the writeout path of the other node.
1690                 * avoid potential distributed deadlock */
1691                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1692                if (epoch)
1693                        break;
1694                else
1695                        drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1696                        /* Fall through */
1697
1698        case WO_BDEV_FLUSH:
1699        case WO_DRAIN_IO:
1700                conn_wait_active_ee_empty(connection);
1701                drbd_flush(connection);
1702
1703                if (atomic_read(&connection->current_epoch->epoch_size)) {
1704                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1705                        if (epoch)
1706                                break;
1707                }
1708
1709                return 0;
1710        default:
1711                drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1712                         connection->resource->write_ordering);
1713                return -EIO;
1714        }
1715
1716        epoch->flags = 0;
1717        atomic_set(&epoch->epoch_size, 0);
1718        atomic_set(&epoch->active, 0);
1719
1720        spin_lock(&connection->epoch_lock);
1721        if (atomic_read(&connection->current_epoch->epoch_size)) {
1722                list_add(&epoch->list, &connection->current_epoch->list);
1723                connection->current_epoch = epoch;
1724                connection->epochs++;
1725        } else {
1726                /* The current_epoch got recycled while we allocated this one... */
1727                kfree(epoch);
1728        }
1729        spin_unlock(&connection->epoch_lock);
1730
1731        return 0;
1732}
1733
1734/* quick wrapper in case payload size != request_size (write same) */
1735static void drbd_csum_ee_size(struct crypto_ahash *h,
1736                              struct drbd_peer_request *r, void *d,
1737                              unsigned int payload_size)
1738{
1739        unsigned int tmp = r->i.size;
1740        r->i.size = payload_size;
1741        drbd_csum_ee(h, r, d);
1742        r->i.size = tmp;
1743}
1744
1745/* used from receive_RSDataReply (recv_resync_read)
1746 * and from receive_Data.
1747 * data_size: actual payload ("data in")
1748 *      for normal writes that is bi_size.
1749 *      for discards, that is zero.
1750 *      for write same, it is logical_block_size.
1751 * both trim and write same have the bi_size ("data len to be affected")
1752 * as extra argument in the packet header.
1753 */
1754static struct drbd_peer_request *
1755read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1756              struct packet_info *pi) __must_hold(local)
1757{
1758        struct drbd_device *device = peer_device->device;
1759        const sector_t capacity = drbd_get_capacity(device->this_bdev);
1760        struct drbd_peer_request *peer_req;
1761        struct page *page;
1762        int digest_size, err;
1763        unsigned int data_size = pi->size, ds;
1764        void *dig_in = peer_device->connection->int_dig_in;
1765        void *dig_vv = peer_device->connection->int_dig_vv;
1766        unsigned long *data;
1767        struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1768        struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1769
1770        digest_size = 0;
1771        if (!trim && peer_device->connection->peer_integrity_tfm) {
1772                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1773                /*
1774                 * FIXME: Receive the incoming digest into the receive buffer
1775                 *        here, together with its struct p_data?
1776                 */
1777                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1778                if (err)
1779                        return NULL;
1780                data_size -= digest_size;
1781        }
1782
1783        /* assume request_size == data_size, but special case trim and wsame. */
1784        ds = data_size;
1785        if (trim) {
1786                if (!expect(data_size == 0))
1787                        return NULL;
1788                ds = be32_to_cpu(trim->size);
1789        } else if (wsame) {
1790                if (data_size != queue_logical_block_size(device->rq_queue)) {
1791                        drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1792                                data_size, queue_logical_block_size(device->rq_queue));
1793                        return NULL;
1794                }
1795                if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1796                        drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1797                                data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1798                        return NULL;
1799                }
1800                ds = be32_to_cpu(wsame->size);
1801        }
1802
1803        if (!expect(IS_ALIGNED(ds, 512)))
1804                return NULL;
1805        if (trim || wsame) {
1806                if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1807                        return NULL;
1808        } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1809                return NULL;
1810
1811        /* even though we trust out peer,
1812         * we sometimes have to double check. */
1813        if (sector + (ds>>9) > capacity) {
1814                drbd_err(device, "request from peer beyond end of local disk: "
1815                        "capacity: %llus < sector: %llus + size: %u\n",
1816                        (unsigned long long)capacity,
1817                        (unsigned long long)sector, ds);
1818                return NULL;
1819        }
1820
1821        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1822         * "criss-cross" setup, that might cause write-out on some other DRBD,
1823         * which in turn might block on the other node at this very place.  */
1824        peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1825        if (!peer_req)
1826                return NULL;
1827
1828        peer_req->flags |= EE_WRITE;
1829        if (trim) {
1830                peer_req->flags |= EE_IS_TRIM;
1831                return peer_req;
1832        }
1833        if (wsame)
1834                peer_req->flags |= EE_WRITE_SAME;
1835
1836        /* receive payload size bytes into page chain */
1837        ds = data_size;
1838        page = peer_req->pages;
1839        page_chain_for_each(page) {
1840                unsigned len = min_t(int, ds, PAGE_SIZE);
1841                data = kmap(page);
1842                err = drbd_recv_all_warn(peer_device->connection, data, len);
1843                if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1844                        drbd_err(device, "Fault injection: Corrupting data on receive\n");
1845                        data[0] = data[0] ^ (unsigned long)-1;
1846                }
1847                kunmap(page);
1848                if (err) {
1849                        drbd_free_peer_req(device, peer_req);
1850                        return NULL;
1851                }
1852                ds -= len;
1853        }
1854
1855        if (digest_size) {
1856                drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1857                if (memcmp(dig_in, dig_vv, digest_size)) {
1858                        drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1859                                (unsigned long long)sector, data_size);
1860                        drbd_free_peer_req(device, peer_req);
1861                        return NULL;
1862                }
1863        }
1864        device->recv_cnt += data_size >> 9;
1865        return peer_req;
1866}
1867
1868/* drbd_drain_block() just takes a data block
1869 * out of the socket input buffer, and discards it.
1870 */
1871static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1872{
1873        struct page *page;
1874        int err = 0;
1875        void *data;
1876
1877        if (!data_size)
1878                return 0;
1879
1880        page = drbd_alloc_pages(peer_device, 1, 1);
1881
1882        data = kmap(page);
1883        while (data_size) {
1884                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1885
1886                err = drbd_recv_all_warn(peer_device->connection, data, len);
1887                if (err)
1888                        break;
1889                data_size -= len;
1890        }
1891        kunmap(page);
1892        drbd_free_pages(peer_device->device, page, 0);
1893        return err;
1894}
1895
1896static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1897                           sector_t sector, int data_size)
1898{
1899        struct bio_vec bvec;
1900        struct bvec_iter iter;
1901        struct bio *bio;
1902        int digest_size, err, expect;
1903        void *dig_in = peer_device->connection->int_dig_in;
1904        void *dig_vv = peer_device->connection->int_dig_vv;
1905
1906        digest_size = 0;
1907        if (peer_device->connection->peer_integrity_tfm) {
1908                digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1909                err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1910                if (err)
1911                        return err;
1912                data_size -= digest_size;
1913        }
1914
1915        /* optimistically update recv_cnt.  if receiving fails below,
1916         * we disconnect anyways, and counters will be reset. */
1917        peer_device->device->recv_cnt += data_size>>9;
1918
1919        bio = req->master_bio;
1920        D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1921
1922        bio_for_each_segment(bvec, bio, iter) {
1923                void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1924                expect = min_t(int, data_size, bvec.bv_len);
1925                err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1926                kunmap(bvec.bv_page);
1927                if (err)
1928                        return err;
1929                data_size -= expect;
1930        }
1931
1932        if (digest_size) {
1933                drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1934                if (memcmp(dig_in, dig_vv, digest_size)) {
1935                        drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1936                        return -EINVAL;
1937                }
1938        }
1939
1940        D_ASSERT(peer_device->device, data_size == 0);
1941        return 0;
1942}
1943
1944/*
1945 * e_end_resync_block() is called in ack_sender context via
1946 * drbd_finish_peer_reqs().
1947 */
1948static int e_end_resync_block(struct drbd_work *w, int unused)
1949{
1950        struct drbd_peer_request *peer_req =
1951                container_of(w, struct drbd_peer_request, w);
1952        struct drbd_peer_device *peer_device = peer_req->peer_device;
1953        struct drbd_device *device = peer_device->device;
1954        sector_t sector = peer_req->i.sector;
1955        int err;
1956
1957        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1958
1959        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1960                drbd_set_in_sync(device, sector, peer_req->i.size);
1961                err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1962        } else {
1963                /* Record failure to sync */
1964                drbd_rs_failed_io(device, sector, peer_req->i.size);
1965
1966                err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1967        }
1968        dec_unacked(device);
1969
1970        return err;
1971}
1972
1973static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1974                            struct packet_info *pi) __releases(local)
1975{
1976        struct drbd_device *device = peer_device->device;
1977        struct drbd_peer_request *peer_req;
1978
1979        peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1980        if (!peer_req)
1981                goto fail;
1982
1983        dec_rs_pending(device);
1984
1985        inc_unacked(device);
1986        /* corresponding dec_unacked() in e_end_resync_block()
1987         * respective _drbd_clear_done_ee */
1988
1989        peer_req->w.cb = e_end_resync_block;
1990        peer_req->submit_jif = jiffies;
1991
1992        spin_lock_irq(&device->resource->req_lock);
1993        list_add_tail(&peer_req->w.list, &device->sync_ee);
1994        spin_unlock_irq(&device->resource->req_lock);
1995
1996        atomic_add(pi->size >> 9, &device->rs_sect_ev);
1997        if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1998                                     DRBD_FAULT_RS_WR) == 0)
1999                return 0;
2000
2001        /* don't care for the reason here */
2002        drbd_err(device, "submit failed, triggering re-connect\n");
2003        spin_lock_irq(&device->resource->req_lock);
2004        list_del(&peer_req->w.list);
2005        spin_unlock_irq(&device->resource->req_lock);
2006
2007        drbd_free_peer_req(device, peer_req);
2008fail:
2009        put_ldev(device);
2010        return -EIO;
2011}
2012
2013static struct drbd_request *
2014find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2015             sector_t sector, bool missing_ok, const char *func)
2016{
2017        struct drbd_request *req;
2018
2019        /* Request object according to our peer */
2020        req = (struct drbd_request *)(unsigned long)id;
2021        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2022                return req;
2023        if (!missing_ok) {
2024                drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2025                        (unsigned long)id, (unsigned long long)sector);
2026        }
2027        return NULL;
2028}
2029
2030static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2031{
2032        struct drbd_peer_device *peer_device;
2033        struct drbd_device *device;
2034        struct drbd_request *req;
2035        sector_t sector;
2036        int err;
2037        struct p_data *p = pi->data;
2038
2039        peer_device = conn_peer_device(connection, pi->vnr);
2040        if (!peer_device)
2041                return -EIO;
2042        device = peer_device->device;
2043
2044        sector = be64_to_cpu(p->sector);
2045
2046        spin_lock_irq(&device->resource->req_lock);
2047        req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2048        spin_unlock_irq(&device->resource->req_lock);
2049        if (unlikely(!req))
2050                return -EIO;
2051
2052        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2053         * special casing it there for the various failure cases.
2054         * still no race with drbd_fail_pending_reads */
2055        err = recv_dless_read(peer_device, req, sector, pi->size);
2056        if (!err)
2057                req_mod(req, DATA_RECEIVED);
2058        /* else: nothing. handled from drbd_disconnect...
2059         * I don't think we may complete this just yet
2060         * in case we are "on-disconnect: freeze" */
2061
2062        return err;
2063}
2064
2065static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2066{
2067        struct drbd_peer_device *peer_device;
2068        struct drbd_device *device;
2069        sector_t sector;
2070        int err;
2071        struct p_data *p = pi->data;
2072
2073        peer_device = conn_peer_device(connection, pi->vnr);
2074        if (!peer_device)
2075                return -EIO;
2076        device = peer_device->device;
2077
2078        sector = be64_to_cpu(p->sector);
2079        D_ASSERT(device, p->block_id == ID_SYNCER);
2080
2081        if (get_ldev(device)) {
2082                /* data is submitted to disk within recv_resync_read.
2083                 * corresponding put_ldev done below on error,
2084                 * or in drbd_peer_request_endio. */
2085                err = recv_resync_read(peer_device, sector, pi);
2086        } else {
2087                if (__ratelimit(&drbd_ratelimit_state))
2088                        drbd_err(device, "Can not write resync data to local disk.\n");
2089
2090                err = drbd_drain_block(peer_device, pi->size);
2091
2092                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2093        }
2094
2095        atomic_add(pi->size >> 9, &device->rs_sect_in);
2096
2097        return err;
2098}
2099
2100static void restart_conflicting_writes(struct drbd_device *device,
2101                                       sector_t sector, int size)
2102{
2103        struct drbd_interval *i;
2104        struct drbd_request *req;
2105
2106        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2107                if (!i->local)
2108                        continue;
2109                req = container_of(i, struct drbd_request, i);
2110                if (req->rq_state & RQ_LOCAL_PENDING ||
2111                    !(req->rq_state & RQ_POSTPONED))
2112                        continue;
2113                /* as it is RQ_POSTPONED, this will cause it to
2114                 * be queued on the retry workqueue. */
2115                __req_mod(req, CONFLICT_RESOLVED, NULL);
2116        }
2117}
2118
2119/*
2120 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2121 */
2122static int e_end_block(struct drbd_work *w, int cancel)
2123{
2124        struct drbd_peer_request *peer_req =
2125                container_of(w, struct drbd_peer_request, w);
2126        struct drbd_peer_device *peer_device = peer_req->peer_device;
2127        struct drbd_device *device = peer_device->device;
2128        sector_t sector = peer_req->i.sector;
2129        int err = 0, pcmd;
2130
2131        if (peer_req->flags & EE_SEND_WRITE_ACK) {
2132                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2133                        pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2134                                device->state.conn <= C_PAUSED_SYNC_T &&
2135                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2136                                P_RS_WRITE_ACK : P_WRITE_ACK;
2137                        err = drbd_send_ack(peer_device, pcmd, peer_req);
2138                        if (pcmd == P_RS_WRITE_ACK)
2139                                drbd_set_in_sync(device, sector, peer_req->i.size);
2140                } else {
2141                        err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2142                        /* we expect it to be marked out of sync anyways...
2143                         * maybe assert this?  */
2144                }
2145                dec_unacked(device);
2146        }
2147
2148        /* we delete from the conflict detection hash _after_ we sent out the
2149         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2150        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2151                spin_lock_irq(&device->resource->req_lock);
2152                D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2153                drbd_remove_epoch_entry_interval(device, peer_req);
2154                if (peer_req->flags & EE_RESTART_REQUESTS)
2155                        restart_conflicting_writes(device, sector, peer_req->i.size);
2156                spin_unlock_irq(&device->resource->req_lock);
2157        } else
2158                D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2159
2160        drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2161
2162        return err;
2163}
2164
2165static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2166{
2167        struct drbd_peer_request *peer_req =
2168                container_of(w, struct drbd_peer_request, w);
2169        struct drbd_peer_device *peer_device = peer_req->peer_device;
2170        int err;
2171
2172        err = drbd_send_ack(peer_device, ack, peer_req);
2173        dec_unacked(peer_device->device);
2174
2175        return err;
2176}
2177
2178static int e_send_superseded(struct drbd_work *w, int unused)
2179{
2180        return e_send_ack(w, P_SUPERSEDED);
2181}
2182
2183static int e_send_retry_write(struct drbd_work *w, int unused)
2184{
2185        struct drbd_peer_request *peer_req =
2186                container_of(w, struct drbd_peer_request, w);
2187        struct drbd_connection *connection = peer_req->peer_device->connection;
2188
2189        return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2190                             P_RETRY_WRITE : P_SUPERSEDED);
2191}
2192
2193static bool seq_greater(u32 a, u32 b)
2194{
2195        /*
2196         * We assume 32-bit wrap-around here.
2197         * For 24-bit wrap-around, we would have to shift:
2198         *  a <<= 8; b <<= 8;
2199         */
2200        return (s32)a - (s32)b > 0;
2201}
2202
2203static u32 seq_max(u32 a, u32 b)
2204{
2205        return seq_greater(a, b) ? a : b;
2206}
2207
2208static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2209{
2210        struct drbd_device *device = peer_device->device;
2211        unsigned int newest_peer_seq;
2212
2213        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2214                spin_lock(&device->peer_seq_lock);
2215                newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2216                device->peer_seq = newest_peer_seq;
2217                spin_unlock(&device->peer_seq_lock);
2218                /* wake up only if we actually changed device->peer_seq */
2219                if (peer_seq == newest_peer_seq)
2220                        wake_up(&device->seq_wait);
2221        }
2222}
2223
2224static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2225{
2226        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2227}
2228
2229/* maybe change sync_ee into interval trees as well? */
2230static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2231{
2232        struct drbd_peer_request *rs_req;
2233        bool rv = false;
2234
2235        spin_lock_irq(&device->resource->req_lock);
2236        list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2237                if (overlaps(peer_req->i.sector, peer_req->i.size,
2238                             rs_req->i.sector, rs_req->i.size)) {
2239                        rv = true;
2240                        break;
2241                }
2242        }
2243        spin_unlock_irq(&device->resource->req_lock);
2244
2245        return rv;
2246}
2247
2248/* Called from receive_Data.
2249 * Synchronize packets on sock with packets on msock.
2250 *
2251 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2252 * packet traveling on msock, they are still processed in the order they have
2253 * been sent.
2254 *
2255 * Note: we don't care for Ack packets overtaking P_DATA packets.
2256 *
2257 * In case packet_seq is larger than device->peer_seq number, there are
2258 * outstanding packets on the msock. We wait for them to arrive.
2259 * In case we are the logically next packet, we update device->peer_seq
2260 * ourselves. Correctly handles 32bit wrap around.
2261 *
2262 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2263 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2264 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2265 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2266 *
2267 * returns 0 if we may process the packet,
2268 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2269static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2270{
2271        struct drbd_device *device = peer_device->device;
2272        DEFINE_WAIT(wait);
2273        long timeout;
2274        int ret = 0, tp;
2275
2276        if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2277                return 0;
2278
2279        spin_lock(&device->peer_seq_lock);
2280        for (;;) {
2281                if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2282                        device->peer_seq = seq_max(device->peer_seq, peer_seq);
2283                        break;
2284                }
2285
2286                if (signal_pending(current)) {
2287                        ret = -ERESTARTSYS;
2288                        break;
2289                }
2290
2291                rcu_read_lock();
2292                tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2293                rcu_read_unlock();
2294
2295                if (!tp)
2296                        break;
2297
2298                /* Only need to wait if two_primaries is enabled */
2299                prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2300                spin_unlock(&device->peer_seq_lock);
2301                rcu_read_lock();
2302                timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2303                rcu_read_unlock();
2304                timeout = schedule_timeout(timeout);
2305                spin_lock(&device->peer_seq_lock);
2306                if (!timeout) {
2307                        ret = -ETIMEDOUT;
2308                        drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2309                        break;
2310                }
2311        }
2312        spin_unlock(&device->peer_seq_lock);
2313        finish_wait(&device->seq_wait, &wait);
2314        return ret;
2315}
2316
2317/* see also bio_flags_to_wire()
2318 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2319 * flags and back. We may replicate to other kernel versions. */
2320static unsigned long wire_flags_to_bio_flags(u32 dpf)
2321{
2322        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2323                (dpf & DP_FUA ? REQ_FUA : 0) |
2324                (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2325}
2326
2327static unsigned long wire_flags_to_bio_op(u32 dpf)
2328{
2329        if (dpf & DP_DISCARD)
2330                return REQ_OP_WRITE_ZEROES;
2331        else
2332                return REQ_OP_WRITE;
2333}
2334
2335static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2336                                    unsigned int size)
2337{
2338        struct drbd_interval *i;
2339
2340    repeat:
2341        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2342                struct drbd_request *req;
2343                struct bio_and_error m;
2344
2345                if (!i->local)
2346                        continue;
2347                req = container_of(i, struct drbd_request, i);
2348                if (!(req->rq_state & RQ_POSTPONED))
2349                        continue;
2350                req->rq_state &= ~RQ_POSTPONED;
2351                __req_mod(req, NEG_ACKED, &m);
2352                spin_unlock_irq(&device->resource->req_lock);
2353                if (m.bio)
2354                        complete_master_bio(device, &m);
2355                spin_lock_irq(&device->resource->req_lock);
2356                goto repeat;
2357        }
2358}
2359
2360static int handle_write_conflicts(struct drbd_device *device,
2361                                  struct drbd_peer_request *peer_req)
2362{
2363        struct drbd_connection *connection = peer_req->peer_device->connection;
2364        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2365        sector_t sector = peer_req->i.sector;
2366        const unsigned int size = peer_req->i.size;
2367        struct drbd_interval *i;
2368        bool equal;
2369        int err;
2370
2371        /*
2372         * Inserting the peer request into the write_requests tree will prevent
2373         * new conflicting local requests from being added.
2374         */
2375        drbd_insert_interval(&device->write_requests, &peer_req->i);
2376
2377    repeat:
2378        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2379                if (i == &peer_req->i)
2380                        continue;
2381                if (i->completed)
2382                        continue;
2383
2384                if (!i->local) {
2385                        /*
2386                         * Our peer has sent a conflicting remote request; this
2387                         * should not happen in a two-node setup.  Wait for the
2388                         * earlier peer request to complete.
2389                         */
2390                        err = drbd_wait_misc(device, i);
2391                        if (err)
2392                                goto out;
2393                        goto repeat;
2394                }
2395
2396                equal = i->sector == sector && i->size == size;
2397                if (resolve_conflicts) {
2398                        /*
2399                         * If the peer request is fully contained within the
2400                         * overlapping request, it can be considered overwritten
2401                         * and thus superseded; otherwise, it will be retried
2402                         * once all overlapping requests have completed.
2403                         */
2404                        bool superseded = i->sector <= sector && i->sector +
2405                                       (i->size >> 9) >= sector + (size >> 9);
2406
2407                        if (!equal)
2408                                drbd_alert(device, "Concurrent writes detected: "
2409                                               "local=%llus +%u, remote=%llus +%u, "
2410                                               "assuming %s came first\n",
2411                                          (unsigned long long)i->sector, i->size,
2412                                          (unsigned long long)sector, size,
2413                                          superseded ? "local" : "remote");
2414
2415                        peer_req->w.cb = superseded ? e_send_superseded :
2416                                                   e_send_retry_write;
2417                        list_add_tail(&peer_req->w.list, &device->done_ee);
2418                        queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2419
2420                        err = -ENOENT;
2421                        goto out;
2422                } else {
2423                        struct drbd_request *req =
2424                                container_of(i, struct drbd_request, i);
2425
2426                        if (!equal)
2427                                drbd_alert(device, "Concurrent writes detected: "
2428                                               "local=%llus +%u, remote=%llus +%u\n",
2429                                          (unsigned long long)i->sector, i->size,
2430                                          (unsigned long long)sector, size);
2431
2432                        if (req->rq_state & RQ_LOCAL_PENDING ||
2433                            !(req->rq_state & RQ_POSTPONED)) {
2434                                /*
2435                                 * Wait for the node with the discard flag to
2436                                 * decide if this request has been superseded
2437                                 * or needs to be retried.
2438                                 * Requests that have been superseded will
2439                                 * disappear from the write_requests tree.
2440                                 *
2441                                 * In addition, wait for the conflicting
2442                                 * request to finish locally before submitting
2443                                 * the conflicting peer request.
2444                                 */
2445                                err = drbd_wait_misc(device, &req->i);
2446                                if (err) {
2447                                        _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2448                                        fail_postponed_requests(device, sector, size);
2449                                        goto out;
2450                                }
2451                                goto repeat;
2452                        }
2453                        /*
2454                         * Remember to restart the conflicting requests after
2455                         * the new peer request has completed.
2456                         */
2457                        peer_req->flags |= EE_RESTART_REQUESTS;
2458                }
2459        }
2460        err = 0;
2461
2462    out:
2463        if (err)
2464                drbd_remove_epoch_entry_interval(device, peer_req);
2465        return err;
2466}
2467
2468/* mirrored write */
2469static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2470{
2471        struct drbd_peer_device *peer_device;
2472        struct drbd_device *device;
2473        struct net_conf *nc;
2474        sector_t sector;
2475        struct drbd_peer_request *peer_req;
2476        struct p_data *p = pi->data;
2477        u32 peer_seq = be32_to_cpu(p->seq_num);
2478        int op, op_flags;
2479        u32 dp_flags;
2480        int err, tp;
2481
2482        peer_device = conn_peer_device(connection, pi->vnr);
2483        if (!peer_device)
2484                return -EIO;
2485        device = peer_device->device;
2486
2487        if (!get_ldev(device)) {
2488                int err2;
2489
2490                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2491                drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2492                atomic_inc(&connection->current_epoch->epoch_size);
2493                err2 = drbd_drain_block(peer_device, pi->size);
2494                if (!err)
2495                        err = err2;
2496                return err;
2497        }
2498
2499        /*
2500         * Corresponding put_ldev done either below (on various errors), or in
2501         * drbd_peer_request_endio, if we successfully submit the data at the
2502         * end of this function.
2503         */
2504
2505        sector = be64_to_cpu(p->sector);
2506        peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2507        if (!peer_req) {
2508                put_ldev(device);
2509                return -EIO;
2510        }
2511
2512        peer_req->w.cb = e_end_block;
2513        peer_req->submit_jif = jiffies;
2514        peer_req->flags |= EE_APPLICATION;
2515
2516        dp_flags = be32_to_cpu(p->dp_flags);
2517        op = wire_flags_to_bio_op(dp_flags);
2518        op_flags = wire_flags_to_bio_flags(dp_flags);
2519        if (pi->cmd == P_TRIM) {
2520                D_ASSERT(peer_device, peer_req->i.size > 0);
2521                D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2522                D_ASSERT(peer_device, peer_req->pages == NULL);
2523        } else if (peer_req->pages == NULL) {
2524                D_ASSERT(device, peer_req->i.size == 0);
2525                D_ASSERT(device, dp_flags & DP_FLUSH);
2526        }
2527
2528        if (dp_flags & DP_MAY_SET_IN_SYNC)
2529                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2530
2531        spin_lock(&connection->epoch_lock);
2532        peer_req->epoch = connection->current_epoch;
2533        atomic_inc(&peer_req->epoch->epoch_size);
2534        atomic_inc(&peer_req->epoch->active);
2535        spin_unlock(&connection->epoch_lock);
2536
2537        rcu_read_lock();
2538        nc = rcu_dereference(peer_device->connection->net_conf);
2539        tp = nc->two_primaries;
2540        if (peer_device->connection->agreed_pro_version < 100) {
2541                switch (nc->wire_protocol) {
2542                case DRBD_PROT_C:
2543                        dp_flags |= DP_SEND_WRITE_ACK;
2544                        break;
2545                case DRBD_PROT_B:
2546                        dp_flags |= DP_SEND_RECEIVE_ACK;
2547                        break;
2548                }
2549        }
2550        rcu_read_unlock();
2551
2552        if (dp_flags & DP_SEND_WRITE_ACK) {
2553                peer_req->flags |= EE_SEND_WRITE_ACK;
2554                inc_unacked(device);
2555                /* corresponding dec_unacked() in e_end_block()
2556                 * respective _drbd_clear_done_ee */
2557        }
2558
2559        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2560                /* I really don't like it that the receiver thread
2561                 * sends on the msock, but anyways */
2562                drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2563        }
2564
2565        if (tp) {
2566                /* two primaries implies protocol C */
2567                D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2568                peer_req->flags |= EE_IN_INTERVAL_TREE;
2569                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2570                if (err)
2571                        goto out_interrupted;
2572                spin_lock_irq(&device->resource->req_lock);
2573                err = handle_write_conflicts(device, peer_req);
2574                if (err) {
2575                        spin_unlock_irq(&device->resource->req_lock);
2576                        if (err == -ENOENT) {
2577                                put_ldev(device);
2578                                return 0;
2579                        }
2580                        goto out_interrupted;
2581                }
2582        } else {
2583                update_peer_seq(peer_device, peer_seq);
2584                spin_lock_irq(&device->resource->req_lock);
2585        }
2586        /* TRIM and WRITE_SAME are processed synchronously,
2587         * we wait for all pending requests, respectively wait for
2588         * active_ee to become empty in drbd_submit_peer_request();
2589         * better not add ourselves here. */
2590        if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2591                list_add_tail(&peer_req->w.list, &device->active_ee);
2592        spin_unlock_irq(&device->resource->req_lock);
2593
2594        if (device->state.conn == C_SYNC_TARGET)
2595                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2596
2597        if (device->state.pdsk < D_INCONSISTENT) {
2598                /* In case we have the only disk of the cluster, */
2599                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2600                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2601                drbd_al_begin_io(device, &peer_req->i);
2602                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2603        }
2604
2605        err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2606                                       DRBD_FAULT_DT_WR);
2607        if (!err)
2608                return 0;
2609
2610        /* don't care for the reason here */
2611        drbd_err(device, "submit failed, triggering re-connect\n");
2612        spin_lock_irq(&device->resource->req_lock);
2613        list_del(&peer_req->w.list);
2614        drbd_remove_epoch_entry_interval(device, peer_req);
2615        spin_unlock_irq(&device->resource->req_lock);
2616        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2617                peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2618                drbd_al_complete_io(device, &peer_req->i);
2619        }
2620
2621out_interrupted:
2622        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2623        put_ldev(device);
2624        drbd_free_peer_req(device, peer_req);
2625        return err;
2626}
2627
2628/* We may throttle resync, if the lower device seems to be busy,
2629 * and current sync rate is above c_min_rate.
2630 *
2631 * To decide whether or not the lower device is busy, we use a scheme similar
2632 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2633 * (more than 64 sectors) of activity we cannot account for with our own resync
2634 * activity, it obviously is "busy".
2635 *
2636 * The current sync rate used here uses only the most recent two step marks,
2637 * to have a short time average so we can react faster.
2638 */
2639bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2640                bool throttle_if_app_is_waiting)
2641{
2642        struct lc_element *tmp;
2643        bool throttle = drbd_rs_c_min_rate_throttle(device);
2644
2645        if (!throttle || throttle_if_app_is_waiting)
2646                return throttle;
2647
2648        spin_lock_irq(&device->al_lock);
2649        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2650        if (tmp) {
2651                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2652                if (test_bit(BME_PRIORITY, &bm_ext->flags))
2653                        throttle = false;
2654                /* Do not slow down if app IO is already waiting for this extent,
2655                 * and our progress is necessary for application IO to complete. */
2656        }
2657        spin_unlock_irq(&device->al_lock);
2658
2659        return throttle;
2660}
2661
2662bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2663{
2664        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2665        unsigned long db, dt, dbdt;
2666        unsigned int c_min_rate;
2667        int curr_events;
2668
2669        rcu_read_lock();
2670        c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2671        rcu_read_unlock();
2672
2673        /* feature disabled? */
2674        if (c_min_rate == 0)
2675                return false;
2676
2677        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2678                      (int)part_stat_read(&disk->part0, sectors[1]) -
2679                        atomic_read(&device->rs_sect_ev);
2680
2681        if (atomic_read(&device->ap_actlog_cnt)
2682            || curr_events - device->rs_last_events > 64) {
2683                unsigned long rs_left;
2684                int i;
2685
2686                device->rs_last_events = curr_events;
2687
2688                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2689                 * approx. */
2690                i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2691
2692                if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2693                        rs_left = device->ov_left;
2694                else
2695                        rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2696
2697                dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2698                if (!dt)
2699                        dt++;
2700                db = device->rs_mark_left[i] - rs_left;
2701                dbdt = Bit2KB(db/dt);
2702
2703                if (dbdt > c_min_rate)
2704                        return true;
2705        }
2706        return false;
2707}
2708
2709static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2710{
2711        struct drbd_peer_device *peer_device;
2712        struct drbd_device *device;
2713        sector_t sector;
2714        sector_t capacity;
2715        struct drbd_peer_request *peer_req;
2716        struct digest_info *di = NULL;
2717        int size, verb;
2718        unsigned int fault_type;
2719        struct p_block_req *p = pi->data;
2720
2721        peer_device = conn_peer_device(connection, pi->vnr);
2722        if (!peer_device)
2723                return -EIO;
2724        device = peer_device->device;
2725        capacity = drbd_get_capacity(device->this_bdev);
2726
2727        sector = be64_to_cpu(p->sector);
2728        size   = be32_to_cpu(p->blksize);
2729
2730        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2731                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2732                                (unsigned long long)sector, size);
2733                return -EINVAL;
2734        }
2735        if (sector + (size>>9) > capacity) {
2736                drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2737                                (unsigned long long)sector, size);
2738                return -EINVAL;
2739        }
2740
2741        if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2742                verb = 1;
2743                switch (pi->cmd) {
2744                case P_DATA_REQUEST:
2745                        drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2746                        break;
2747                case P_RS_THIN_REQ:
2748                case P_RS_DATA_REQUEST:
2749                case P_CSUM_RS_REQUEST:
2750                case P_OV_REQUEST:
2751                        drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2752                        break;
2753                case P_OV_REPLY:
2754                        verb = 0;
2755                        dec_rs_pending(device);
2756                        drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2757                        break;
2758                default:
2759                        BUG();
2760                }
2761                if (verb && __ratelimit(&drbd_ratelimit_state))
2762                        drbd_err(device, "Can not satisfy peer's read request, "
2763                            "no local data.\n");
2764
2765                /* drain possibly payload */
2766                return drbd_drain_block(peer_device, pi->size);
2767        }
2768
2769        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2770         * "criss-cross" setup, that might cause write-out on some other DRBD,
2771         * which in turn might block on the other node at this very place.  */
2772        peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2773                        size, GFP_NOIO);
2774        if (!peer_req) {
2775                put_ldev(device);
2776                return -ENOMEM;
2777        }
2778
2779        switch (pi->cmd) {
2780        case P_DATA_REQUEST:
2781                peer_req->w.cb = w_e_end_data_req;
2782                fault_type = DRBD_FAULT_DT_RD;
2783                /* application IO, don't drbd_rs_begin_io */
2784                peer_req->flags |= EE_APPLICATION;
2785                goto submit;
2786
2787        case P_RS_THIN_REQ:
2788                /* If at some point in the future we have a smart way to
2789                   find out if this data block is completely deallocated,
2790                   then we would do something smarter here than reading
2791                   the block... */
2792                peer_req->flags |= EE_RS_THIN_REQ;
2793        case P_RS_DATA_REQUEST:
2794                peer_req->w.cb = w_e_end_rsdata_req;
2795                fault_type = DRBD_FAULT_RS_RD;
2796                /* used in the sector offset progress display */
2797                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2798                break;
2799
2800        case P_OV_REPLY:
2801        case P_CSUM_RS_REQUEST:
2802                fault_type = DRBD_FAULT_RS_RD;
2803                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2804                if (!di)
2805                        goto out_free_e;
2806
2807                di->digest_size = pi->size;
2808                di->digest = (((char *)di)+sizeof(struct digest_info));
2809
2810                peer_req->digest = di;
2811                peer_req->flags |= EE_HAS_DIGEST;
2812
2813                if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2814                        goto out_free_e;
2815
2816                if (pi->cmd == P_CSUM_RS_REQUEST) {
2817                        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2818                        peer_req->w.cb = w_e_end_csum_rs_req;
2819                        /* used in the sector offset progress display */
2820                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2821                        /* remember to report stats in drbd_resync_finished */
2822                        device->use_csums = true;
2823                } else if (pi->cmd == P_OV_REPLY) {
2824                        /* track progress, we may need to throttle */
2825                        atomic_add(size >> 9, &device->rs_sect_in);
2826                        peer_req->w.cb = w_e_end_ov_reply;
2827                        dec_rs_pending(device);
2828                        /* drbd_rs_begin_io done when we sent this request,
2829                         * but accounting still needs to be done. */
2830                        goto submit_for_resync;
2831                }
2832                break;
2833
2834        case P_OV_REQUEST:
2835                if (device->ov_start_sector == ~(sector_t)0 &&
2836                    peer_device->connection->agreed_pro_version >= 90) {
2837                        unsigned long now = jiffies;
2838                        int i;
2839                        device->ov_start_sector = sector;
2840                        device->ov_position = sector;
2841                        device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2842                        device->rs_total = device->ov_left;
2843                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2844                                device->rs_mark_left[i] = device->ov_left;
2845                                device->rs_mark_time[i] = now;
2846                        }
2847                        drbd_info(device, "Online Verify start sector: %llu\n",
2848                                        (unsigned long long)sector);
2849                }
2850                peer_req->w.cb = w_e_end_ov_req;
2851                fault_type = DRBD_FAULT_RS_RD;
2852                break;
2853
2854        default:
2855                BUG();
2856        }
2857
2858        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2859         * wrt the receiver, but it is not as straightforward as it may seem.
2860         * Various places in the resync start and stop logic assume resync
2861         * requests are processed in order, requeuing this on the worker thread
2862         * introduces a bunch of new code for synchronization between threads.
2863         *
2864         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2865         * "forever", throttling after drbd_rs_begin_io will lock that extent
2866         * for application writes for the same time.  For now, just throttle
2867         * here, where the rest of the code expects the receiver to sleep for
2868         * a while, anyways.
2869         */
2870
2871        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2872         * this defers syncer requests for some time, before letting at least
2873         * on request through.  The resync controller on the receiving side
2874         * will adapt to the incoming rate accordingly.
2875         *
2876         * We cannot throttle here if remote is Primary/SyncTarget:
2877         * we would also throttle its application reads.
2878         * In that case, throttling is done on the SyncTarget only.
2879         */
2880
2881        /* Even though this may be a resync request, we do add to "read_ee";
2882         * "sync_ee" is only used for resync WRITEs.
2883         * Add to list early, so debugfs can find this request
2884         * even if we have to sleep below. */
2885        spin_lock_irq(&device->resource->req_lock);
2886        list_add_tail(&peer_req->w.list, &device->read_ee);
2887        spin_unlock_irq(&device->resource->req_lock);
2888
2889        update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2890        if (device->state.peer != R_PRIMARY
2891        && drbd_rs_should_slow_down(device, sector, false))
2892                schedule_timeout_uninterruptible(HZ/10);
2893        update_receiver_timing_details(connection, drbd_rs_begin_io);
2894        if (drbd_rs_begin_io(device, sector))
2895                goto out_free_e;
2896
2897submit_for_resync:
2898        atomic_add(size >> 9, &device->rs_sect_ev);
2899
2900submit:
2901        update_receiver_timing_details(connection, drbd_submit_peer_request);
2902        inc_unacked(device);
2903        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2904                                     fault_type) == 0)
2905                return 0;
2906
2907        /* don't care for the reason here */
2908        drbd_err(device, "submit failed, triggering re-connect\n");
2909
2910out_free_e:
2911        spin_lock_irq(&device->resource->req_lock);
2912        list_del(&peer_req->w.list);
2913        spin_unlock_irq(&device->resource->req_lock);
2914        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2915
2916        put_ldev(device);
2917        drbd_free_peer_req(device, peer_req);
2918        return -EIO;
2919}
2920
2921/**
2922 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2923 */
2924static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2925{
2926        struct drbd_device *device = peer_device->device;
2927        int self, peer, rv = -100;
2928        unsigned long ch_self, ch_peer;
2929        enum drbd_after_sb_p after_sb_0p;
2930
2931        self = device->ldev->md.uuid[UI_BITMAP] & 1;
2932        peer = device->p_uuid[UI_BITMAP] & 1;
2933
2934        ch_peer = device->p_uuid[UI_SIZE];
2935        ch_self = device->comm_bm_set;
2936
2937        rcu_read_lock();
2938        after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2939        rcu_read_unlock();
2940        switch (after_sb_0p) {
2941        case ASB_CONSENSUS:
2942        case ASB_DISCARD_SECONDARY:
2943        case ASB_CALL_HELPER:
2944        case ASB_VIOLENTLY:
2945                drbd_err(device, "Configuration error.\n");
2946                break;
2947        case ASB_DISCONNECT:
2948                break;
2949        case ASB_DISCARD_YOUNGER_PRI:
2950                if (self == 0 && peer == 1) {
2951                        rv = -1;
2952                        break;
2953                }
2954                if (self == 1 && peer == 0) {
2955                        rv =  1;
2956                        break;
2957                }
2958                /* Else fall through to one of the other strategies... */
2959        case ASB_DISCARD_OLDER_PRI:
2960                if (self == 0 && peer == 1) {
2961                        rv = 1;
2962                        break;
2963                }
2964                if (self == 1 && peer == 0) {
2965                        rv = -1;
2966                        break;
2967                }
2968                /* Else fall through to one of the other strategies... */
2969                drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2970                     "Using discard-least-changes instead\n");
2971        case ASB_DISCARD_ZERO_CHG:
2972                if (ch_peer == 0 && ch_self == 0) {
2973                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2974                                ? -1 : 1;
2975                        break;
2976                } else {
2977                        if (ch_peer == 0) { rv =  1; break; }
2978                        if (ch_self == 0) { rv = -1; break; }
2979                }
2980                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2981                        break;
2982        case ASB_DISCARD_LEAST_CHG:
2983                if      (ch_self < ch_peer)
2984                        rv = -1;
2985                else if (ch_self > ch_peer)
2986                        rv =  1;
2987                else /* ( ch_self == ch_peer ) */
2988                     /* Well, then use something else. */
2989                        rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2990                                ? -1 : 1;
2991                break;
2992        case ASB_DISCARD_LOCAL:
2993                rv = -1;
2994                break;
2995        case ASB_DISCARD_REMOTE:
2996                rv =  1;
2997        }
2998
2999        return rv;
3000}
3001
3002/**
3003 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3004 */
3005static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3006{
3007        struct drbd_device *device = peer_device->device;
3008        int hg, rv = -100;
3009        enum drbd_after_sb_p after_sb_1p;
3010
3011        rcu_read_lock();
3012        after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3013        rcu_read_unlock();
3014        switch (after_sb_1p) {
3015        case ASB_DISCARD_YOUNGER_PRI:
3016        case ASB_DISCARD_OLDER_PRI:
3017        case ASB_DISCARD_LEAST_CHG:
3018        case ASB_DISCARD_LOCAL:
3019        case ASB_DISCARD_REMOTE:
3020        case ASB_DISCARD_ZERO_CHG:
3021                drbd_err(device, "Configuration error.\n");
3022                break;
3023        case ASB_DISCONNECT:
3024                break;
3025        case ASB_CONSENSUS:
3026                hg = drbd_asb_recover_0p(peer_device);
3027                if (hg == -1 && device->state.role == R_SECONDARY)
3028                        rv = hg;
3029                if (hg == 1  && device->state.role == R_PRIMARY)
3030                        rv = hg;
3031                break;
3032        case ASB_VIOLENTLY:
3033                rv = drbd_asb_recover_0p(peer_device);
3034                break;
3035        case ASB_DISCARD_SECONDARY:
3036                return device->state.role == R_PRIMARY ? 1 : -1;
3037        case ASB_CALL_HELPER:
3038                hg = drbd_asb_recover_0p(peer_device);
3039                if (hg == -1 && device->state.role == R_PRIMARY) {
3040                        enum drbd_state_rv rv2;
3041
3042                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3043                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3044                          * we do not need to wait for the after state change work either. */
3045                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3046                        if (rv2 != SS_SUCCESS) {
3047                                drbd_khelper(device, "pri-lost-after-sb");
3048                        } else {
3049                                drbd_warn(device, "Successfully gave up primary role.\n");
3050                                rv = hg;
3051                        }
3052                } else
3053                        rv = hg;
3054        }
3055
3056        return rv;
3057}
3058
3059/**
3060 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3061 */
3062static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3063{
3064        struct drbd_device *device = peer_device->device;
3065        int hg, rv = -100;
3066        enum drbd_after_sb_p after_sb_2p;
3067
3068        rcu_read_lock();
3069        after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3070        rcu_read_unlock();
3071        switch (after_sb_2p) {
3072        case ASB_DISCARD_YOUNGER_PRI:
3073        case ASB_DISCARD_OLDER_PRI:
3074        case ASB_DISCARD_LEAST_CHG:
3075        case ASB_DISCARD_LOCAL:
3076        case ASB_DISCARD_REMOTE:
3077        case ASB_CONSENSUS:
3078        case ASB_DISCARD_SECONDARY:
3079        case ASB_DISCARD_ZERO_CHG:
3080                drbd_err(device, "Configuration error.\n");
3081                break;
3082        case ASB_VIOLENTLY:
3083                rv = drbd_asb_recover_0p(peer_device);
3084                break;
3085        case ASB_DISCONNECT:
3086                break;
3087        case ASB_CALL_HELPER:
3088                hg = drbd_asb_recover_0p(peer_device);
3089                if (hg == -1) {
3090                        enum drbd_state_rv rv2;
3091
3092                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3093                          * we might be here in C_WF_REPORT_PARAMS which is transient.
3094                          * we do not need to wait for the after state change work either. */
3095                        rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3096                        if (rv2 != SS_SUCCESS) {
3097                                drbd_khelper(device, "pri-lost-after-sb");
3098                        } else {
3099                                drbd_warn(device, "Successfully gave up primary role.\n");
3100                                rv = hg;
3101                        }
3102                } else
3103                        rv = hg;
3104        }
3105
3106        return rv;
3107}
3108
3109static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3110                           u64 bits, u64 flags)
3111{
3112        if (!uuid) {
3113                drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3114                return;
3115        }
3116        drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3117             text,
3118             (unsigned long long)uuid[UI_CURRENT],
3119             (unsigned long long)uuid[UI_BITMAP],
3120             (unsigned long long)uuid[UI_HISTORY_START],
3121             (unsigned long long)uuid[UI_HISTORY_END],
3122             (unsigned long long)bits,
3123             (unsigned long long)flags);
3124}
3125
3126/*
3127  100   after split brain try auto recover
3128    2   C_SYNC_SOURCE set BitMap
3129    1   C_SYNC_SOURCE use BitMap
3130    0   no Sync
3131   -1   C_SYNC_TARGET use BitMap
3132   -2   C_SYNC_TARGET set BitMap
3133 -100   after split brain, disconnect
3134-1000   unrelated data
3135-1091   requires proto 91
3136-1096   requires proto 96
3137 */
3138
3139static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3140{
3141        struct drbd_peer_device *const peer_device = first_peer_device(device);
3142        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3143        u64 self, peer;
3144        int i, j;
3145
3146        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3147        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3148
3149        *rule_nr = 10;
3150        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3151                return 0;
3152
3153        *rule_nr = 20;
3154        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3155             peer != UUID_JUST_CREATED)
3156                return -2;
3157
3158        *rule_nr = 30;
3159        if (self != UUID_JUST_CREATED &&
3160            (peer == UUID_JUST_CREATED || peer == (u64)0))
3161                return 2;
3162
3163        if (self == peer) {
3164                int rct, dc; /* roles at crash time */
3165
3166                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3167
3168                        if (connection->agreed_pro_version < 91)
3169                                return -1091;
3170
3171                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3172                            (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3173                                drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3174                                drbd_uuid_move_history(device);
3175                                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3176                                device->ldev->md.uuid[UI_BITMAP] = 0;
3177
3178                                drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3179                                               device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3180                                *rule_nr = 34;
3181                        } else {
3182                                drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3183                                *rule_nr = 36;
3184                        }
3185
3186                        return 1;
3187                }
3188
3189                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3190
3191                        if (connection->agreed_pro_version < 91)
3192                                return -1091;
3193
3194                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3195                            (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3196                                drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3197
3198                                device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3199                                device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3200                                device->p_uuid[UI_BITMAP] = 0UL;
3201
3202                                drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3203                                *rule_nr = 35;
3204                        } else {
3205                                drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3206                                *rule_nr = 37;
3207                        }
3208
3209                        return -1;
3210                }
3211
3212                /* Common power [off|failure] */
3213                rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3214                        (device->p_uuid[UI_FLAGS] & 2);
3215                /* lowest bit is set when we were primary,
3216                 * next bit (weight 2) is set when peer was primary */
3217                *rule_nr = 40;
3218
3219                /* Neither has the "crashed primary" flag set,
3220                 * only a replication link hickup. */
3221                if (rct == 0)
3222                        return 0;
3223
3224                /* Current UUID equal and no bitmap uuid; does not necessarily
3225                 * mean this was a "simultaneous hard crash", maybe IO was
3226                 * frozen, so no UUID-bump happened.
3227                 * This is a protocol change, overload DRBD_FF_WSAME as flag
3228                 * for "new-enough" peer DRBD version. */
3229                if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3230                        *rule_nr = 41;
3231                        if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3232                                drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3233                                return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3234                        }
3235                        if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3236                                /* At least one has the "crashed primary" bit set,
3237                                 * both are primary now, but neither has rotated its UUIDs?
3238                                 * "Can not happen." */
3239                                drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3240                                return -100;
3241                        }
3242                        if (device->state.role == R_PRIMARY)
3243                                return 1;
3244                        return -1;
3245                }
3246
3247                /* Both are secondary.
3248                 * Really looks like recovery from simultaneous hard crash.
3249                 * Check which had been primary before, and arbitrate. */
3250                switch (rct) {
3251                case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3252                case 1: /*  self_pri && !peer_pri */ return 1;
3253                case 2: /* !self_pri &&  peer_pri */ return -1;
3254                case 3: /*  self_pri &&  peer_pri */
3255                        dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3256                        return dc ? -1 : 1;
3257                }
3258        }
3259
3260        *rule_nr = 50;
3261        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3262        if (self == peer)
3263                return -1;
3264
3265        *rule_nr = 51;
3266        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3267        if (self == peer) {
3268                if (connection->agreed_pro_version < 96 ?
3269                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3270                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3271                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3272                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3273                           resync as sync source modifications of the peer's UUIDs. */
3274
3275                        if (connection->agreed_pro_version < 91)
3276                                return -1091;
3277
3278                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3279                        device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3280
3281                        drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3282                        drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3283
3284                        return -1;
3285                }
3286        }
3287
3288        *rule_nr = 60;
3289        self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3290        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3291                peer = device->p_uuid[i] & ~((u64)1);
3292                if (self == peer)
3293                        return -2;
3294        }
3295
3296        *rule_nr = 70;
3297        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3298        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3299        if (self == peer)
3300                return 1;
3301
3302        *rule_nr = 71;
3303        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3304        if (self == peer) {
3305                if (connection->agreed_pro_version < 96 ?
3306                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3307                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3308                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3309                        /* The last P_SYNC_UUID did not get though. Undo the last start of
3310                           resync as sync source modifications of our UUIDs. */
3311
3312                        if (connection->agreed_pro_version < 91)
3313                                return -1091;
3314
3315                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3316                        __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3317
3318                        drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3319                        drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3320                                       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3321
3322                        return 1;
3323                }
3324        }
3325
3326
3327        *rule_nr = 80;
3328        peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3329        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3330                self = device->ldev->md.uuid[i] & ~((u64)1);
3331                if (self == peer)
3332                        return 2;
3333        }
3334
3335        *rule_nr = 90;
3336        self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3337        peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3338        if (self == peer && self != ((u64)0))
3339                return 100;
3340
3341        *rule_nr = 100;
3342        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3343                self = device->ldev->md.uuid[i] & ~((u64)1);
3344                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3345                        peer = device->p_uuid[j] & ~((u64)1);
3346                        if (self == peer)
3347                                return -100;
3348                }
3349        }
3350
3351        return -1000;
3352}
3353
3354/* drbd_sync_handshake() returns the new conn state on success, or
3355   CONN_MASK (-1) on failure.
3356 */
3357static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3358                                           enum drbd_role peer_role,
3359                                           enum drbd_disk_state peer_disk) __must_hold(local)
3360{
3361        struct drbd_device *device = peer_device->device;
3362        enum drbd_conns rv = C_MASK;
3363        enum drbd_disk_state mydisk;
3364        struct net_conf *nc;
3365        int hg, rule_nr, rr_conflict, tentative;
3366
3367        mydisk = device->state.disk;
3368        if (mydisk == D_NEGOTIATING)
3369                mydisk = device->new_state_tmp.disk;
3370
3371        drbd_info(device, "drbd_sync_handshake:\n");
3372
3373        spin_lock_irq(&device->ldev->md.uuid_lock);
3374        drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3375        drbd_uuid_dump(device, "peer", device->p_uuid,
3376                       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3377
3378        hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3379        spin_unlock_irq(&device->ldev->md.uuid_lock);
3380
3381        drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3382
3383        if (hg == -1000) {
3384                drbd_alert(device, "Unrelated data, aborting!\n");
3385                return C_MASK;
3386        }
3387        if (hg < -0x10000) {
3388                int proto, fflags;
3389                hg = -hg;
3390                proto = hg & 0xff;
3391                fflags = (hg >> 8) & 0xff;
3392                drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3393                                        proto, fflags);
3394                return C_MASK;
3395        }
3396        if (hg < -1000) {
3397                drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3398                return C_MASK;
3399        }
3400
3401        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3402            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3403                int f = (hg == -100) || abs(hg) == 2;
3404                hg = mydisk > D_INCONSISTENT ? 1 : -1;
3405                if (f)
3406                        hg = hg*2;
3407                drbd_info(device, "Becoming sync %s due to disk states.\n",
3408                     hg > 0 ? "source" : "target");
3409        }
3410
3411        if (abs(hg) == 100)
3412                drbd_khelper(device, "initial-split-brain");
3413
3414        rcu_read_lock();
3415        nc = rcu_dereference(peer_device->connection->net_conf);
3416
3417        if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3418                int pcount = (device->state.role == R_PRIMARY)
3419                           + (peer_role == R_PRIMARY);
3420                int forced = (hg == -100);
3421
3422                switch (pcount) {
3423                case 0:
3424                        hg = drbd_asb_recover_0p(peer_device);
3425                        break;
3426                case 1:
3427                        hg = drbd_asb_recover_1p(peer_device);
3428                        break;
3429                case 2:
3430                        hg = drbd_asb_recover_2p(peer_device);
3431                        break;
3432                }
3433                if (abs(hg) < 100) {
3434                        drbd_warn(device, "Split-Brain detected, %d primaries, "
3435                             "automatically solved. Sync from %s node\n",
3436                             pcount, (hg < 0) ? "peer" : "this");
3437                        if (forced) {
3438                                drbd_warn(device, "Doing a full sync, since"
3439                                     " UUIDs where ambiguous.\n");
3440                                hg = hg*2;
3441                        }
3442                }
3443        }
3444
3445        if (hg == -100) {
3446                if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3447                        hg = -1;
3448                if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3449                        hg = 1;
3450
3451                if (abs(hg) < 100)
3452                        drbd_warn(device, "Split-Brain detected, manually solved. "
3453                             "Sync from %s node\n",
3454                             (hg < 0) ? "peer" : "this");
3455        }
3456        rr_conflict = nc->rr_conflict;
3457        tentative = nc->tentative;
3458        rcu_read_unlock();
3459
3460        if (hg == -100) {
3461                /* FIXME this log message is not correct if we end up here
3462                 * after an attempted attach on a diskless node.
3463                 * We just refuse to attach -- well, we drop the "connection"
3464                 * to that disk, in a way... */
3465                drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3466                drbd_khelper(device, "split-brain");
3467                return C_MASK;
3468        }
3469
3470        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3471                drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3472                return C_MASK;
3473        }
3474
3475        if (hg < 0 && /* by intention we do not use mydisk here. */
3476            device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3477                switch (rr_conflict) {
3478                case ASB_CALL_HELPER:
3479                        drbd_khelper(device, "pri-lost");
3480                        /* fall through */
3481                case ASB_DISCONNECT:
3482                        drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3483                        return C_MASK;
3484                case ASB_VIOLENTLY:
3485                        drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3486                             "assumption\n");
3487                }
3488        }
3489
3490        if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3491                if (hg == 0)
3492                        drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3493                else
3494                        drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3495                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3496                                 abs(hg) >= 2 ? "full" : "bit-map based");
3497                return C_MASK;
3498        }
3499
3500        if (abs(hg) >= 2) {
3501                drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3502                if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3503                                        BM_LOCKED_SET_ALLOWED))
3504                        return C_MASK;
3505        }
3506
3507        if (hg > 0) { /* become sync source. */
3508                rv = C_WF_BITMAP_S;
3509        } else if (hg < 0) { /* become sync target */
3510                rv = C_WF_BITMAP_T;
3511        } else {
3512                rv = C_CONNECTED;
3513                if (drbd_bm_total_weight(device)) {
3514                        drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3515                             drbd_bm_total_weight(device));
3516                }
3517        }
3518
3519        return rv;
3520}
3521
3522static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3523{
3524        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3525        if (peer == ASB_DISCARD_REMOTE)
3526                return ASB_DISCARD_LOCAL;
3527
3528        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3529        if (peer == ASB_DISCARD_LOCAL)
3530                return ASB_DISCARD_REMOTE;
3531
3532        /* everything else is valid if they are equal on both sides. */
3533        return peer;
3534}
3535
3536static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3537{
3538        struct p_protocol *p = pi->data;
3539        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3540        int p_proto, p_discard_my_data, p_two_primaries, cf;
3541        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3542        char integrity_alg[SHARED_SECRET_MAX] = "";
3543        struct crypto_ahash *peer_integrity_tfm = NULL;
3544        void *int_dig_in = NULL, *int_dig_vv = NULL;
3545
3546        p_proto         = be32_to_cpu(p->protocol);
3547        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3548        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3549        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3550        p_two_primaries = be32_to_cpu(p->two_primaries);
3551        cf              = be32_to_cpu(p->conn_flags);
3552        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3553
3554        if (connection->agreed_pro_version >= 87) {
3555                int err;
3556
3557                if (pi->size > sizeof(integrity_alg))
3558                        return -EIO;
3559                err = drbd_recv_all(connection, integrity_alg, pi->size);
3560                if (err)
3561                        return err;
3562                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3563        }
3564
3565        if (pi->cmd != P_PROTOCOL_UPDATE) {
3566                clear_bit(CONN_DRY_RUN, &connection->flags);
3567
3568                if (cf & CF_DRY_RUN)
3569                        set_bit(CONN_DRY_RUN, &connection->flags);
3570
3571                rcu_read_lock();
3572                nc = rcu_dereference(connection->net_conf);
3573
3574                if (p_proto != nc->wire_protocol) {
3575                        drbd_err(connection, "incompatible %s settings\n", "protocol");
3576                        goto disconnect_rcu_unlock;
3577                }
3578
3579                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3580                        drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3581                        goto disconnect_rcu_unlock;
3582                }
3583
3584                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3585                        drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3586                        goto disconnect_rcu_unlock;
3587                }
3588
3589                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3590                        drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3591                        goto disconnect_rcu_unlock;
3592                }
3593
3594                if (p_discard_my_data && nc->discard_my_data) {
3595                        drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3596                        goto disconnect_rcu_unlock;
3597                }
3598
3599                if (p_two_primaries != nc->two_primaries) {
3600                        drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3601                        goto disconnect_rcu_unlock;
3602                }
3603
3604                if (strcmp(integrity_alg, nc->integrity_alg)) {
3605                        drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3606                        goto disconnect_rcu_unlock;
3607                }
3608
3609                rcu_read_unlock();
3610        }
3611
3612        if (integrity_alg[0]) {
3613                int hash_size;
3614
3615                /*
3616                 * We can only change the peer data integrity algorithm
3617                 * here.  Changing our own data integrity algorithm
3618                 * requires that we send a P_PROTOCOL_UPDATE packet at
3619                 * the same time; otherwise, the peer has no way to
3620                 * tell between which packets the algorithm should
3621                 * change.
3622                 */
3623
3624                peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3625                if (IS_ERR(peer_integrity_tfm)) {
3626                        peer_integrity_tfm = NULL;
3627                        drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3628                                 integrity_alg);
3629                        goto disconnect;
3630                }
3631
3632                hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3633                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3634                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3635                if (!(int_dig_in && int_dig_vv)) {
3636                        drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3637                        goto disconnect;
3638                }
3639        }
3640
3641        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3642        if (!new_net_conf) {
3643                drbd_err(connection, "Allocation of new net_conf failed\n");
3644                goto disconnect;
3645        }
3646
3647        mutex_lock(&connection->data.mutex);
3648        mutex_lock(&connection->resource->conf_update);
3649        old_net_conf = connection->net_conf;
3650        *new_net_conf = *old_net_conf;
3651
3652        new_net_conf->wire_protocol = p_proto;
3653        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3654        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3655        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3656        new_net_conf->two_primaries = p_two_primaries;
3657
3658        rcu_assign_pointer(connection->net_conf, new_net_conf);
3659        mutex_unlock(&connection->resource->conf_update);
3660        mutex_unlock(&connection->data.mutex);
3661
3662        crypto_free_ahash(connection->peer_integrity_tfm);
3663        kfree(connection->int_dig_in);
3664        kfree(connection->int_dig_vv);
3665        connection->peer_integrity_tfm = peer_integrity_tfm;
3666        connection->int_dig_in = int_dig_in;
3667        connection->int_dig_vv = int_dig_vv;
3668
3669        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3670                drbd_info(connection, "peer data-integrity-alg: %s\n",
3671                          integrity_alg[0] ? integrity_alg : "(none)");
3672
3673        synchronize_rcu();
3674        kfree(old_net_conf);
3675        return 0;
3676
3677disconnect_rcu_unlock:
3678        rcu_read_unlock();
3679disconnect:
3680        crypto_free_ahash(peer_integrity_tfm);
3681        kfree(int_dig_in);
3682        kfree(int_dig_vv);
3683        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3684        return -EIO;
3685}
3686
3687/* helper function
3688 * input: alg name, feature name
3689 * return: NULL (alg name was "")
3690 *         ERR_PTR(error) if something goes wrong
3691 *         or the crypto hash ptr, if it worked out ok. */
3692static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3693                const char *alg, const char *name)
3694{
3695        struct crypto_ahash *tfm;
3696
3697        if (!alg[0])
3698                return NULL;
3699
3700        tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3701        if (IS_ERR(tfm)) {
3702                drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3703                        alg, name, PTR_ERR(tfm));
3704                return tfm;
3705        }
3706        return tfm;
3707}
3708
3709static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3710{
3711        void *buffer = connection->data.rbuf;
3712        int size = pi->size;
3713
3714        while (size) {
3715                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3716                s = drbd_recv(connection, buffer, s);
3717                if (s <= 0) {
3718                        if (s < 0)
3719                                return s;
3720                        break;
3721                }
3722                size -= s;
3723        }
3724        if (size)
3725                return -EIO;
3726        return 0;
3727}
3728
3729/*
3730 * config_unknown_volume  -  device configuration command for unknown volume
3731 *
3732 * When a device is added to an existing connection, the node on which the
3733 * device is added first will send configuration commands to its peer but the
3734 * peer will not know about the device yet.  It will warn and ignore these
3735 * commands.  Once the device is added on the second node, the second node will
3736 * send the same device configuration commands, but in the other direction.
3737 *
3738 * (We can also end up here if drbd is misconfigured.)
3739 */
3740static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3741{
3742        drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3743                  cmdname(pi->cmd), pi->vnr);
3744        return ignore_remaining_packet(connection, pi);
3745}
3746
3747static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3748{
3749        struct drbd_peer_device *peer_device;
3750        struct drbd_device *device;
3751        struct p_rs_param_95 *p;
3752        unsigned int header_size, data_size, exp_max_sz;
3753        struct crypto_ahash *verify_tfm = NULL;
3754        struct crypto_ahash *csums_tfm = NULL;
3755        struct net_conf *old_net_conf, *new_net_conf = NULL;
3756        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3757        const int apv = connection->agreed_pro_version;
3758        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3759        int fifo_size = 0;
3760        int err;
3761
3762        peer_device = conn_peer_device(connection, pi->vnr);
3763        if (!peer_device)
3764                return config_unknown_volume(connection, pi);
3765        device = peer_device->device;
3766
3767        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3768                    : apv == 88 ? sizeof(struct p_rs_param)
3769                                        + SHARED_SECRET_MAX
3770                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3771                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3772
3773        if (pi->size > exp_max_sz) {
3774                drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3775                    pi->size, exp_max_sz);
3776                return -EIO;
3777        }
3778
3779        if (apv <= 88) {
3780                header_size = sizeof(struct p_rs_param);
3781                data_size = pi->size - header_size;
3782        } else if (apv <= 94) {
3783                header_size = sizeof(struct p_rs_param_89);
3784                data_size = pi->size - header_size;
3785                D_ASSERT(device, data_size == 0);
3786        } else {
3787                header_size = sizeof(struct p_rs_param_95);
3788                data_size = pi->size - header_size;
3789                D_ASSERT(device, data_size == 0);
3790        }
3791
3792        /* initialize verify_alg and csums_alg */
3793        p = pi->data;
3794        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3795
3796        err = drbd_recv_all(peer_device->connection, p, header_size);
3797        if (err)
3798                return err;
3799
3800        mutex_lock(&connection->resource->conf_update);
3801        old_net_conf = peer_device->connection->net_conf;
3802        if (get_ldev(device)) {
3803                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3804                if (!new_disk_conf) {
3805                        put_ldev(device);
3806                        mutex_unlock(&connection->resource->conf_update);
3807                        drbd_err(device, "Allocation of new disk_conf failed\n");
3808                        return -ENOMEM;
3809                }
3810
3811                old_disk_conf = device->ldev->disk_conf;
3812                *new_disk_conf = *old_disk_conf;
3813
3814                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3815        }
3816
3817        if (apv >= 88) {
3818                if (apv == 88) {
3819                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3820                                drbd_err(device, "verify-alg of wrong size, "
3821                                        "peer wants %u, accepting only up to %u byte\n",
3822                                        data_size, SHARED_SECRET_MAX);
3823                                err = -EIO;
3824                                goto reconnect;
3825                        }
3826
3827                        err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3828                        if (err)
3829                                goto reconnect;
3830                        /* we expect NUL terminated string */
3831                        /* but just in case someone tries to be evil */
3832                        D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3833                        p->verify_alg[data_size-1] = 0;
3834
3835                } else /* apv >= 89 */ {
3836                        /* we still expect NUL terminated strings */
3837                        /* but just in case someone tries to be evil */
3838                        D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3839                        D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3840                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3841                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3842                }
3843
3844                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3845                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3846                                drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3847                                    old_net_conf->verify_alg, p->verify_alg);
3848                                goto disconnect;
3849                        }
3850                        verify_tfm = drbd_crypto_alloc_digest_safe(device,
3851                                        p->verify_alg, "verify-alg");
3852                        if (IS_ERR(verify_tfm)) {
3853                                verify_tfm = NULL;
3854                                goto disconnect;
3855                        }
3856                }
3857
3858                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3859                        if (device->state.conn == C_WF_REPORT_PARAMS) {
3860                                drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3861                                    old_net_conf->csums_alg, p->csums_alg);
3862                                goto disconnect;
3863                        }
3864                        csums_tfm = drbd_crypto_alloc_digest_safe(device,
3865                                        p->csums_alg, "csums-alg");
3866                        if (IS_ERR(csums_tfm)) {
3867                                csums_tfm = NULL;
3868                                goto disconnect;
3869                        }
3870                }
3871
3872                if (apv > 94 && new_disk_conf) {
3873                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3874                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3875                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3876                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3877
3878                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3879                        if (fifo_size != device->rs_plan_s->size) {
3880                                new_plan = fifo_alloc(fifo_size);
3881                                if (!new_plan) {
3882                                        drbd_err(device, "kmalloc of fifo_buffer failed");
3883                                        put_ldev(device);
3884                                        goto disconnect;
3885                                }
3886                        }
3887                }
3888
3889                if (verify_tfm || csums_tfm) {
3890                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3891                        if (!new_net_conf) {
3892                                drbd_err(device, "Allocation of new net_conf failed\n");
3893                                goto disconnect;
3894                        }
3895
3896                        *new_net_conf = *old_net_conf;
3897
3898                        if (verify_tfm) {
3899                                strcpy(new_net_conf->verify_alg, p->verify_alg);
3900                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3901                                crypto_free_ahash(peer_device->connection->verify_tfm);
3902                                peer_device->connection->verify_tfm = verify_tfm;
3903                                drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3904                        }
3905                        if (csums_tfm) {
3906                                strcpy(new_net_conf->csums_alg, p->csums_alg);
3907                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3908                                crypto_free_ahash(peer_device->connection->csums_tfm);
3909                                peer_device->connection->csums_tfm = csums_tfm;
3910                                drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3911                        }
3912                        rcu_assign_pointer(connection->net_conf, new_net_conf);
3913                }
3914        }
3915
3916        if (new_disk_conf) {
3917                rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3918                put_ldev(device);
3919        }
3920
3921        if (new_plan) {
3922                old_plan = device->rs_plan_s;
3923                rcu_assign_pointer(device->rs_plan_s, new_plan);
3924        }
3925
3926        mutex_unlock(&connection->resource->conf_update);
3927        synchronize_rcu();
3928        if (new_net_conf)
3929                kfree(old_net_conf);
3930        kfree(old_disk_conf);
3931        kfree(old_plan);
3932
3933        return 0;
3934
3935reconnect:
3936        if (new_disk_conf) {
3937                put_ldev(device);
3938                kfree(new_disk_conf);
3939        }
3940        mutex_unlock(&connection->resource->conf_update);
3941        return -EIO;
3942
3943disconnect:
3944        kfree(new_plan);
3945        if (new_disk_conf) {
3946                put_ldev(device);
3947                kfree(new_disk_conf);
3948        }
3949        mutex_unlock(&connection->resource->conf_update);
3950        /* just for completeness: actually not needed,
3951         * as this is not reached if csums_tfm was ok. */
3952        crypto_free_ahash(csums_tfm);
3953        /* but free the verify_tfm again, if csums_tfm did not work out */
3954        crypto_free_ahash(verify_tfm);
3955        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3956        return -EIO;
3957}
3958
3959/* warn if the arguments differ by more than 12.5% */
3960static void warn_if_differ_considerably(struct drbd_device *device,
3961        const char *s, sector_t a, sector_t b)
3962{
3963        sector_t d;
3964        if (a == 0 || b == 0)
3965                return;
3966        d = (a > b) ? (a - b) : (b - a);
3967        if (d > (a>>3) || d > (b>>3))
3968                drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3969                     (unsigned long long)a, (unsigned long long)b);
3970}
3971
3972static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3973{
3974        struct drbd_peer_device *peer_device;
3975        struct drbd_device *device;
3976        struct p_sizes *p = pi->data;
3977        struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3978        enum determine_dev_size dd = DS_UNCHANGED;
3979        sector_t p_size, p_usize, p_csize, my_usize;
3980        int ldsc = 0; /* local disk size changed */
3981        enum dds_flags ddsf;
3982
3983        peer_device = conn_peer_device(connection, pi->vnr);
3984        if (!peer_device)
3985                return config_unknown_volume(connection, pi);
3986        device = peer_device->device;
3987
3988        p_size = be64_to_cpu(p->d_size);
3989        p_usize = be64_to_cpu(p->u_size);
3990        p_csize = be64_to_cpu(p->c_size);
3991
3992        /* just store the peer's disk size for now.
3993         * we still need to figure out whether we accept that. */
3994        device->p_size = p_size;
3995
3996        if (get_ldev(device)) {
3997                sector_t new_size, cur_size;
3998                rcu_read_lock();
3999                my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4000                rcu_read_unlock();
4001
4002                warn_if_differ_considerably(device, "lower level device sizes",
4003                           p_size, drbd_get_max_capacity(device->ldev));
4004                warn_if_differ_considerably(device, "user requested size",
4005                                            p_usize, my_usize);
4006
4007                /* if this is the first connect, or an otherwise expected
4008                 * param exchange, choose the minimum */
4009                if (device->state.conn == C_WF_REPORT_PARAMS)
4010                        p_usize = min_not_zero(my_usize, p_usize);
4011
4012                /* Never shrink a device with usable data during connect.
4013                   But allow online shrinking if we are connected. */
4014                new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4015                cur_size = drbd_get_capacity(device->this_bdev);
4016                if (new_size < cur_size &&
4017                    device->state.disk >= D_OUTDATED &&
4018                    device->state.conn < C_CONNECTED) {
4019                        drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4020                                        (unsigned long long)new_size, (unsigned long long)cur_size);
4021                        conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4022                        put_ldev(device);
4023                        return -EIO;
4024                }
4025
4026                if (my_usize != p_usize) {
4027                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4028
4029                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4030                        if (!new_disk_conf) {
4031                                drbd_err(device, "Allocation of new disk_conf failed\n");
4032                                put_ldev(device);
4033                                return -ENOMEM;
4034                        }
4035
4036                        mutex_lock(&connection->resource->conf_update);
4037                        old_disk_conf = device->ldev->disk_conf;
4038                        *new_disk_conf = *old_disk_conf;
4039                        new_disk_conf->disk_size = p_usize;
4040
4041                        rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4042                        mutex_unlock(&connection->resource->conf_update);
4043                        synchronize_rcu();
4044                        kfree(old_disk_conf);
4045
4046                        drbd_info(device, "Peer sets u_size to %lu sectors\n",
4047                                 (unsigned long)my_usize);
4048                }
4049
4050                put_ldev(device);
4051        }
4052
4053        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4054        /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4055           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4056           drbd_reconsider_queue_parameters(), we can be sure that after
4057           drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4058
4059        ddsf = be16_to_cpu(p->dds_flags);
4060        if (get_ldev(device)) {
4061                drbd_reconsider_queue_parameters(device, device->ldev, o);
4062                dd = drbd_determine_dev_size(device, ddsf, NULL);
4063                put_ldev(device);
4064                if (dd == DS_ERROR)
4065                        return -EIO;
4066                drbd_md_sync(device);
4067        } else {
4068                /*
4069                 * I am diskless, need to accept the peer's *current* size.
4070                 * I must NOT accept the peers backing disk size,
4071                 * it may have been larger than mine all along...
4072                 *
4073                 * At this point, the peer knows more about my disk, or at
4074                 * least about what we last agreed upon, than myself.
4075                 * So if his c_size is less than his d_size, the most likely
4076                 * reason is that *my* d_size was smaller last time we checked.
4077                 *
4078                 * However, if he sends a zero current size,
4079                 * take his (user-capped or) backing disk size anyways.
4080                 */
4081                drbd_reconsider_queue_parameters(device, NULL, o);
4082                drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4083        }
4084
4085        if (get_ldev(device)) {
4086                if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4087                        device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4088                        ldsc = 1;
4089                }
4090
4091                put_ldev(device);
4092        }
4093
4094        if (device->state.conn > C_WF_REPORT_PARAMS) {
4095                if (be64_to_cpu(p->c_size) !=
4096                    drbd_get_capacity(device->this_bdev) || ldsc) {
4097                        /* we have different sizes, probably peer
4098                         * needs to know my new size... */
4099                        drbd_send_sizes(peer_device, 0, ddsf);
4100                }
4101                if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4102                    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4103                        if (device->state.pdsk >= D_INCONSISTENT &&
4104                            device->state.disk >= D_INCONSISTENT) {
4105                                if (ddsf & DDSF_NO_RESYNC)
4106                                        drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4107                                else
4108                                        resync_after_online_grow(device);
4109                        } else
4110                                set_bit(RESYNC_AFTER_NEG, &device->flags);
4111                }
4112        }
4113
4114        return 0;
4115}
4116
4117static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4118{
4119        struct drbd_peer_device *peer_device;
4120        struct drbd_device *device;
4121        struct p_uuids *p = pi->data;
4122        u64 *p_uuid;
4123        int i, updated_uuids = 0;
4124
4125        peer_device = conn_peer_device(connection, pi->vnr);
4126        if (!peer_device)
4127                return config_unknown_volume(connection, pi);
4128        device = peer_device->device;
4129
4130        p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4131        if (!p_uuid) {
4132                drbd_err(device, "kmalloc of p_uuid failed\n");
4133                return false;
4134        }
4135
4136        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4137                p_uuid[i] = be64_to_cpu(p->uuid[i]);
4138
4139        kfree(device->p_uuid);
4140        device->p_uuid = p_uuid;
4141
4142        if (device->state.conn < C_CONNECTED &&
4143            device->state.disk < D_INCONSISTENT &&
4144            device->state.role == R_PRIMARY &&
4145            (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4146                drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4147                    (unsigned long long)device->ed_uuid);
4148                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4149                return -EIO;
4150        }
4151
4152        if (get_ldev(device)) {
4153                int skip_initial_sync =
4154                        device->state.conn == C_CONNECTED &&
4155                        peer_device->connection->agreed_pro_version >= 90 &&
4156                        device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4157                        (p_uuid[UI_FLAGS] & 8);
4158                if (skip_initial_sync) {
4159                        drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4160                        drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4161                                        "clear_n_write from receive_uuids",
4162                                        BM_LOCKED_TEST_ALLOWED);
4163                        _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4164                        _drbd_uuid_set(device, UI_BITMAP, 0);
4165                        _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4166                                        CS_VERBOSE, NULL);
4167                        drbd_md_sync(device);
4168                        updated_uuids = 1;
4169                }
4170                put_ldev(device);
4171        } else if (device->state.disk < D_INCONSISTENT &&
4172                   device->state.role == R_PRIMARY) {
4173                /* I am a diskless primary, the peer just created a new current UUID
4174                   for me. */
4175                updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4176        }
4177
4178        /* Before we test for the disk state, we should wait until an eventually
4179           ongoing cluster wide state change is finished. That is important if
4180           we are primary and are detaching from our disk. We need to see the
4181           new disk state... */
4182        mutex_lock(device->state_mutex);
4183        mutex_unlock(device->state_mutex);
4184        if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4185                updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4186
4187        if (updated_uuids)
4188                drbd_print_uuids(device, "receiver updated UUIDs to");
4189
4190        return 0;
4191}
4192
4193/**
4194 * convert_state() - Converts the peer's view of the cluster state to our point of view
4195 * @ps:         The state as seen by the peer.
4196 */
4197static union drbd_state convert_state(union drbd_state ps)
4198{
4199        union drbd_state ms;
4200
4201        static enum drbd_conns c_tab[] = {
4202                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4203                [C_CONNECTED] = C_CONNECTED,
4204
4205                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4206                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4207                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4208                [C_VERIFY_S]       = C_VERIFY_T,
4209                [C_MASK]   = C_MASK,
4210        };
4211
4212        ms.i = ps.i;
4213
4214        ms.conn = c_tab[ps.conn];
4215        ms.peer = ps.role;
4216        ms.role = ps.peer;
4217        ms.pdsk = ps.disk;
4218        ms.disk = ps.pdsk;
4219        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4220
4221        return ms;
4222}
4223
4224static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4225{
4226        struct drbd_peer_device *peer_device;
4227        struct drbd_device *device;
4228        struct p_req_state *p = pi->data;
4229        union drbd_state mask, val;
4230        enum drbd_state_rv rv;
4231
4232        peer_device = conn_peer_device(connection, pi->vnr);
4233        if (!peer_device)
4234                return -EIO;
4235        device = peer_device->device;
4236
4237        mask.i = be32_to_cpu(p->mask);
4238        val.i = be32_to_cpu(p->val);
4239
4240        if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4241            mutex_is_locked(device->state_mutex)) {
4242                drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4243                return 0;
4244        }
4245
4246        mask = convert_state(mask);
4247        val = convert_state(val);
4248
4249        rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4250        drbd_send_sr_reply(peer_device, rv);
4251
4252        drbd_md_sync(device);
4253
4254        return 0;
4255}
4256
4257static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4258{
4259        struct p_req_state *p = pi->data;
4260        union drbd_state mask, val;
4261        enum drbd_state_rv rv;
4262
4263        mask.i = be32_to_cpu(p->mask);
4264        val.i = be32_to_cpu(p->val);
4265
4266        if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4267            mutex_is_locked(&connection->cstate_mutex)) {
4268                conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4269                return 0;
4270        }
4271
4272        mask = convert_state(mask);
4273        val = convert_state(val);
4274
4275        rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4276        conn_send_sr_reply(connection, rv);
4277
4278        return 0;
4279}
4280
4281static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4282{
4283        struct drbd_peer_device *peer_device;
4284        struct drbd_device *device;
4285        struct p_state *p = pi->data;
4286        union drbd_state os, ns, peer_state;
4287        enum drbd_disk_state real_peer_disk;
4288        enum chg_state_flags cs_flags;
4289        int rv;
4290
4291        peer_device = conn_peer_device(connection, pi->vnr);
4292        if (!peer_device)
4293                return config_unknown_volume(connection, pi);
4294        device = peer_device->device;
4295
4296        peer_state.i = be32_to_cpu(p->state);
4297
4298        real_peer_disk = peer_state.disk;
4299        if (peer_state.disk == D_NEGOTIATING) {
4300                real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4301                drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4302        }
4303
4304        spin_lock_irq(&device->resource->req_lock);
4305 retry:
4306        os = ns = drbd_read_state(device);
4307        spin_unlock_irq(&device->resource->req_lock);
4308
4309        /* If some other part of the code (ack_receiver thread, timeout)
4310         * already decided to close the connection again,
4311         * we must not "re-establish" it here. */
4312        if (os.conn <= C_TEAR_DOWN)
4313                return -ECONNRESET;
4314
4315        /* If this is the "end of sync" confirmation, usually the peer disk
4316         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4317         * set) resync started in PausedSyncT, or if the timing of pause-/
4318         * unpause-sync events has been "just right", the peer disk may
4319         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4320         */
4321        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4322            real_peer_disk == D_UP_TO_DATE &&
4323            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4324                /* If we are (becoming) SyncSource, but peer is still in sync
4325                 * preparation, ignore its uptodate-ness to avoid flapping, it
4326                 * will change to inconsistent once the peer reaches active
4327                 * syncing states.
4328                 * It may have changed syncer-paused flags, however, so we
4329                 * cannot ignore this completely. */
4330                if (peer_state.conn > C_CONNECTED &&
4331                    peer_state.conn < C_SYNC_SOURCE)
4332                        real_peer_disk = D_INCONSISTENT;
4333
4334                /* if peer_state changes to connected at the same time,
4335                 * it explicitly notifies us that it finished resync.
4336                 * Maybe we should finish it up, too? */
4337                else if (os.conn >= C_SYNC_SOURCE &&
4338                         peer_state.conn == C_CONNECTED) {
4339                        if (drbd_bm_total_weight(device) <= device->rs_failed)
4340                                drbd_resync_finished(device);
4341                        return 0;
4342                }
4343        }
4344
4345        /* explicit verify finished notification, stop sector reached. */
4346        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4347            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4348                ov_out_of_sync_print(device);
4349                drbd_resync_finished(device);
4350                return 0;
4351        }
4352
4353        /* peer says his disk is inconsistent, while we think it is uptodate,
4354         * and this happens while the peer still thinks we have a sync going on,
4355         * but we think we are already done with the sync.
4356         * We ignore this to avoid flapping pdsk.
4357         * This should not happen, if the peer is a recent version of drbd. */
4358        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4359            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4360                real_peer_disk = D_UP_TO_DATE;
4361
4362        if (ns.conn == C_WF_REPORT_PARAMS)
4363                ns.conn = C_CONNECTED;
4364
4365        if (peer_state.conn == C_AHEAD)
4366                ns.conn = C_BEHIND;
4367
4368        if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4369            get_ldev_if_state(device, D_NEGOTIATING)) {
4370                int cr; /* consider resync */
4371
4372                /* if we established a new connection */
4373                cr  = (os.conn < C_CONNECTED);
4374                /* if we had an established connection
4375                 * and one of the nodes newly attaches a disk */
4376                cr |= (os.conn == C_CONNECTED &&
4377                       (peer_state.disk == D_NEGOTIATING ||
4378                        os.disk == D_NEGOTIATING));
4379                /* if we have both been inconsistent, and the peer has been
4380                 * forced to be UpToDate with --overwrite-data */
4381                cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4382                /* if we had been plain connected, and the admin requested to
4383                 * start a sync by "invalidate" or "invalidate-remote" */
4384                cr |= (os.conn == C_CONNECTED &&
4385                                (peer_state.conn >= C_STARTING_SYNC_S &&
4386                                 peer_state.conn <= C_WF_BITMAP_T));
4387
4388                if (cr)
4389                        ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4390
4391                put_ldev(device);
4392                if (ns.conn == C_MASK) {
4393                        ns.conn = C_CONNECTED;
4394                        if (device->state.disk == D_NEGOTIATING) {
4395                                drbd_force_state(device, NS(disk, D_FAILED));
4396                        } else if (peer_state.disk == D_NEGOTIATING) {
4397                                drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4398                                peer_state.disk = D_DISKLESS;
4399                                real_peer_disk = D_DISKLESS;
4400                        } else {
4401                                if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4402                                        return -EIO;
4403                                D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4404                                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4405                                return -EIO;
4406                        }
4407                }
4408        }
4409
4410        spin_lock_irq(&device->resource->req_lock);
4411        if (os.i != drbd_read_state(device).i)
4412                goto retry;
4413        clear_bit(CONSIDER_RESYNC, &device->flags);
4414        ns.peer = peer_state.role;
4415        ns.pdsk = real_peer_disk;
4416        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4417        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4418                ns.disk = device->new_state_tmp.disk;
4419        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4420        if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4421            test_bit(NEW_CUR_UUID, &device->flags)) {
4422                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4423                   for temporal network outages! */
4424                spin_unlock_irq(&device->resource->req_lock);
4425                drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4426                tl_clear(peer_device->connection);
4427                drbd_uuid_new_current(device);
4428                clear_bit(NEW_CUR_UUID, &device->flags);
4429                conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4430                return -EIO;
4431        }
4432        rv = _drbd_set_state(device, ns, cs_flags, NULL);
4433        ns = drbd_read_state(device);
4434        spin_unlock_irq(&device->resource->req_lock);
4435
4436        if (rv < SS_SUCCESS) {
4437                conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4438                return -EIO;
4439        }
4440
4441        if (os.conn > C_WF_REPORT_PARAMS) {
4442                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4443                    peer_state.disk != D_NEGOTIATING ) {
4444                        /* we want resync, peer has not yet decided to sync... */
4445                        /* Nowadays only used when forcing a node into primary role and
4446                           setting its disk to UpToDate with that */
4447                        drbd_send_uuids(peer_device);
4448                        drbd_send_current_state(peer_device);
4449                }
4450        }
4451
4452        clear_bit(DISCARD_MY_DATA, &device->flags);
4453
4454        drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4455
4456        return 0;
4457}
4458
4459static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4460{
4461        struct drbd_peer_device *peer_device;
4462        struct drbd_device *device;
4463        struct p_rs_uuid *p = pi->data;
4464
4465        peer_device = conn_peer_device(connection, pi->vnr);
4466        if (!peer_device)
4467                return -EIO;
4468        device = peer_device->device;
4469
4470        wait_event(device->misc_wait,
4471                   device->state.conn == C_WF_SYNC_UUID ||
4472                   device->state.conn == C_BEHIND ||
4473                   device->state.conn < C_CONNECTED ||
4474                   device->state.disk < D_NEGOTIATING);
4475
4476        /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4477
4478        /* Here the _drbd_uuid_ functions are right, current should
4479           _not_ be rotated into the history */
4480        if (get_ldev_if_state(device, D_NEGOTIATING)) {
4481                _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4482                _drbd_uuid_set(device, UI_BITMAP, 0UL);
4483
4484                drbd_print_uuids(device, "updated sync uuid");
4485                drbd_start_resync(device, C_SYNC_TARGET);
4486
4487                put_ldev(device);
4488        } else
4489                drbd_err(device, "Ignoring SyncUUID packet!\n");
4490
4491        return 0;
4492}
4493
4494/**
4495 * receive_bitmap_plain
4496 *
4497 * Return 0 when done, 1 when another iteration is needed, and a negative error
4498 * code upon failure.
4499 */
4500static int
4501receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4502                     unsigned long *p, struct bm_xfer_ctx *c)
4503{
4504        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4505                                 drbd_header_size(peer_device->connection);
4506        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4507                                       c->bm_words - c->word_offset);
4508        unsigned int want = num_words * sizeof(*p);
4509        int err;
4510
4511        if (want != size) {
4512                drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4513                return -EIO;
4514        }
4515        if (want == 0)
4516                return 0;
4517        err = drbd_recv_all(peer_device->connection, p, want);
4518        if (err)
4519                return err;
4520
4521        drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4522
4523        c->word_offset += num_words;
4524        c->bit_offset = c->word_offset * BITS_PER_LONG;
4525        if (c->bit_offset > c->bm_bits)
4526                c->bit_offset = c->bm_bits;
4527
4528        return 1;
4529}
4530
4531static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4532{
4533        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4534}
4535
4536static int dcbp_get_start(struct p_compressed_bm *p)
4537{
4538        return (p->encoding & 0x80) != 0;
4539}
4540
4541static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4542{
4543        return (p->encoding >> 4) & 0x7;
4544}
4545
4546/**
4547 * recv_bm_rle_bits
4548 *
4549 * Return 0 when done, 1 when another iteration is needed, and a negative error
4550 * code upon failure.
4551 */
4552static int
4553recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4554                struct p_compressed_bm *p,
4555                 struct bm_xfer_ctx *c,
4556                 unsigned int len)
4557{
4558        struct bitstream bs;
4559        u64 look_ahead;
4560        u64 rl;
4561        u64 tmp;
4562        unsigned long s = c->bit_offset;
4563        unsigned long e;
4564        int toggle = dcbp_get_start(p);
4565        int have;
4566        int bits;
4567
4568        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4569
4570        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4571        if (bits < 0)
4572                return -EIO;
4573
4574        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4575                bits = vli_decode_bits(&rl, look_ahead);
4576                if (bits <= 0)
4577                        return -EIO;
4578
4579                if (toggle) {
4580                        e = s + rl -1;
4581                        if (e >= c->bm_bits) {
4582                                drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4583                                return -EIO;
4584                        }
4585                        _drbd_bm_set_bits(peer_device->device, s, e);
4586                }
4587
4588                if (have < bits) {
4589                        drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4590                                have, bits, look_ahead,
4591                                (unsigned int)(bs.cur.b - p->code),
4592                                (unsigned int)bs.buf_len);
4593                        return -EIO;
4594                }
4595                /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4596                if (likely(bits < 64))
4597                        look_ahead >>= bits;
4598                else
4599                        look_ahead = 0;
4600                have -= bits;
4601
4602                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4603                if (bits < 0)
4604                        return -EIO;
4605                look_ahead |= tmp << have;
4606                have += bits;
4607        }
4608
4609        c->bit_offset = s;
4610        bm_xfer_ctx_bit_to_word_offset(c);
4611
4612        return (s != c->bm_bits);
4613}
4614
4615/**
4616 * decode_bitmap_c
4617 *
4618 * Return 0 when done, 1 when another iteration is needed, and a negative error
4619 * code upon failure.
4620 */
4621static int
4622decode_bitmap_c(struct drbd_peer_device *peer_device,
4623                struct p_compressed_bm *p,
4624                struct bm_xfer_ctx *c,
4625                unsigned int len)
4626{
4627        if (dcbp_get_code(p) == RLE_VLI_Bits)
4628                return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4629
4630        /* other variants had been implemented for evaluation,
4631         * but have been dropped as this one turned out to be "best"
4632         * during all our tests. */
4633
4634        drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4635        conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4636        return -EIO;
4637}
4638
4639void INFO_bm_xfer_stats(struct drbd_device *device,
4640                const char *direction, struct bm_xfer_ctx *c)
4641{
4642        /* what would it take to transfer it "plaintext" */
4643        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4644        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4645        unsigned int plain =
4646                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4647                c->bm_words * sizeof(unsigned long);
4648        unsigned int total = c->bytes[0] + c->bytes[1];
4649        unsigned int r;
4650
4651        /* total can not be zero. but just in case: */
4652        if (total == 0)
4653                return;
4654
4655        /* don't report if not compressed */
4656        if (total >= plain)
4657                return;
4658
4659        /* total < plain. check for overflow, still */
4660        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4661                                    : (1000 * total / plain);
4662
4663        if (r > 1000)
4664                r = 1000;
4665
4666        r = 1000 - r;
4667        drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4668             "total %u; compression: %u.%u%%\n",
4669                        direction,
4670                        c->bytes[1], c->packets[1],
4671                        c->bytes[0], c->packets[0],
4672                        total, r/10, r % 10);
4673}
4674
4675/* Since we are processing the bitfield from lower addresses to higher,
4676   it does not matter if the process it in 32 bit chunks or 64 bit
4677   chunks as long as it is little endian. (Understand it as byte stream,
4678   beginning with the lowest byte...) If we would use big endian
4679   we would need to process it from the highest address to the lowest,
4680   in order to be agnostic to the 32 vs 64 bits issue.
4681
4682   returns 0 on failure, 1 if we successfully received it. */
4683static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4684{
4685        struct drbd_peer_device *peer_device;
4686        struct drbd_device *device;
4687        struct bm_xfer_ctx c;
4688        int err;
4689
4690        peer_device = conn_peer_device(connection, pi->vnr);
4691        if (!peer_device)
4692                return -EIO;
4693        device = peer_device->device;
4694
4695        drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4696        /* you are supposed to send additional out-of-sync information
4697         * if you actually set bits during this phase */
4698
4699        c = (struct bm_xfer_ctx) {
4700                .bm_bits = drbd_bm_bits(device),
4701                .bm_words = drbd_bm_words(device),
4702        };
4703
4704        for(;;) {
4705                if (pi->cmd == P_BITMAP)
4706                        err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4707                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4708                        /* MAYBE: sanity check that we speak proto >= 90,
4709                         * and the feature is enabled! */
4710                        struct p_compressed_bm *p = pi->data;
4711
4712                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4713                                drbd_err(device, "ReportCBitmap packet too large\n");
4714                                err = -EIO;
4715                                goto out;
4716                        }
4717                        if (pi->size <= sizeof(*p)) {
4718                                drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4719                                err = -EIO;
4720                                goto out;
4721                        }
4722                        err = drbd_recv_all(peer_device->connection, p, pi->size);
4723                        if (err)
4724                               goto out;
4725                        err = decode_bitmap_c(peer_device, p, &c, pi->size);
4726                } else {
4727                        drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4728                        err = -EIO;
4729                        goto out;
4730                }
4731
4732                c.packets[pi->cmd == P_BITMAP]++;
4733                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4734
4735                if (err <= 0) {
4736                        if (err < 0)
4737                                goto out;
4738                        break;
4739                }
4740                err = drbd_recv_header(peer_device->connection, pi);
4741                if (err)
4742                        goto out;
4743        }
4744
4745        INFO_bm_xfer_stats(device, "receive", &c);
4746
4747        if (device->state.conn == C_WF_BITMAP_T) {
4748                enum drbd_state_rv rv;
4749
4750                err = drbd_send_bitmap(device);
4751                if (err)
4752                        goto out;
4753                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4754                rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4755                D_ASSERT(device, rv == SS_SUCCESS);
4756        } else if (device->state.conn != C_WF_BITMAP_S) {
4757                /* admin may have requested C_DISCONNECTING,
4758                 * other threads may have noticed network errors */
4759                drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4760                    drbd_conn_str(device->state.conn));
4761        }
4762        err = 0;
4763
4764 out:
4765        drbd_bm_unlock(device);
4766        if (!err && device->state.conn == C_WF_BITMAP_S)
4767                drbd_start_resync(device, C_SYNC_SOURCE);
4768        return err;
4769}
4770
4771static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4772{
4773        drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4774                 pi->cmd, pi->size);
4775
4776        return ignore_remaining_packet(connection, pi);
4777}
4778
4779static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4780{
4781        /* Make sure we've acked all the TCP data associated
4782         * with the data requests being unplugged */
4783        drbd_tcp_quickack(connection->data.socket);
4784
4785        return 0;
4786}
4787
4788static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4789{
4790        struct drbd_peer_device *peer_device;
4791        struct drbd_device *device;
4792        struct p_block_desc *p = pi->data;
4793
4794        peer_device = conn_peer_device(connection, pi->vnr);
4795        if (!peer_device)
4796                return -EIO;
4797        device = peer_device->device;
4798
4799        switch (device->state.conn) {
4800        case C_WF_SYNC_UUID:
4801        case C_WF_BITMAP_T:
4802        case C_BEHIND:
4803                        break;
4804        default:
4805                drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4806                                drbd_conn_str(device->state.conn));
4807        }
4808
4809        drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4810
4811        return 0;
4812}
4813
4814static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4815{
4816        struct drbd_peer_device *peer_device;
4817        struct p_block_desc *p = pi->data;
4818        struct drbd_device *device;
4819        sector_t sector;
4820        int size, err = 0;
4821
4822        peer_device = conn_peer_device(connection, pi->vnr);
4823        if (!peer_device)
4824                return -EIO;
4825        device = peer_device->device;
4826
4827        sector = be64_to_cpu(p->sector);
4828        size = be32_to_cpu(p->blksize);
4829
4830        dec_rs_pending(device);
4831
4832        if (get_ldev(device)) {
4833                struct drbd_peer_request *peer_req;
4834                const int op = REQ_OP_WRITE_ZEROES;
4835
4836                peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4837                                               size, 0, GFP_NOIO);
4838                if (!peer_req) {
4839                        put_ldev(device);
4840                        return -ENOMEM;
4841                }
4842
4843                peer_req->w.cb = e_end_resync_block;
4844                peer_req->submit_jif = jiffies;
4845                peer_req->flags |= EE_IS_TRIM;
4846
4847                spin_lock_irq(&device->resource->req_lock);
4848                list_add_tail(&peer_req->w.list, &device->sync_ee);
4849                spin_unlock_irq(&device->resource->req_lock);
4850
4851                atomic_add(pi->size >> 9, &device->rs_sect_ev);
4852                err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4853
4854                if (err) {
4855                        spin_lock_irq(&device->resource->req_lock);
4856                        list_del(&peer_req->w.list);
4857                        spin_unlock_irq(&device->resource->req_lock);
4858
4859                        drbd_free_peer_req(device, peer_req);
4860                        put_ldev(device);
4861                        err = 0;
4862                        goto fail;
4863                }
4864
4865                inc_unacked(device);
4866
4867                /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4868                   as well as drbd_rs_complete_io() */
4869        } else {
4870        fail:
4871                drbd_rs_complete_io(device, sector);
4872                drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4873        }
4874
4875        atomic_add(size >> 9, &device->rs_sect_in);
4876
4877        return err;
4878}
4879
4880struct data_cmd {
4881        int expect_payload;
4882        unsigned int pkt_size;
4883        int (*fn)(struct drbd_connection *, struct packet_info *);
4884};
4885
4886static struct data_cmd drbd_cmd_handler[] = {
4887        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4888        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4889        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4890        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4891        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4892        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4893        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4894        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4895        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4896        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4897        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4898        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4899        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4900        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4901        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4902        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4903        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4904        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4905        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4906        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4907        [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4908        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4909        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4910        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4911        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4912        [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4913        [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4914        [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4915};
4916
4917static void drbdd(struct drbd_connection *connection)
4918{
4919        struct packet_info pi;
4920        size_t shs; /* sub header size */
4921        int err;
4922
4923        while (get_t_state(&connection->receiver) == RUNNING) {
4924                struct data_cmd const *cmd;
4925
4926                drbd_thread_current_set_cpu(&connection->receiver);
4927                update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4928                if (drbd_recv_header_maybe_unplug(connection, &pi))
4929                        goto err_out;
4930
4931                cmd = &drbd_cmd_handler[pi.cmd];
4932                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4933                        drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4934                                 cmdname(pi.cmd), pi.cmd);
4935                        goto err_out;
4936                }
4937
4938                shs = cmd->pkt_size;
4939                if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4940                        shs += sizeof(struct o_qlim);
4941                if (pi.size > shs && !cmd->expect_payload) {
4942                        drbd_err(connection, "No payload expected %s l:%d\n",
4943                                 cmdname(pi.cmd), pi.size);
4944                        goto err_out;
4945                }
4946                if (pi.size < shs) {
4947                        drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4948                                 cmdname(pi.cmd), (int)shs, pi.size);
4949                        goto err_out;
4950                }
4951
4952                if (shs) {
4953                        update_receiver_timing_details(connection, drbd_recv_all_warn);
4954                        err = drbd_recv_all_warn(connection, pi.data, shs);
4955                        if (err)
4956                                goto err_out;
4957                        pi.size -= shs;
4958                }
4959
4960                update_receiver_timing_details(connection, cmd->fn);
4961                err = cmd->fn(connection, &pi);
4962                if (err) {
4963                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4964                                 cmdname(pi.cmd), err, pi.size);
4965                        goto err_out;
4966                }
4967        }
4968        return;
4969
4970    err_out:
4971        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4972}
4973
4974static void conn_disconnect(struct drbd_connection *connection)
4975{
4976        struct drbd_peer_device *peer_device;
4977        enum drbd_conns oc;
4978        int vnr;
4979
4980        if (connection->cstate == C_STANDALONE)
4981                return;
4982
4983        /* We are about to start the cleanup after connection loss.
4984         * Make sure drbd_make_request knows about that.
4985         * Usually we should be in some network failure state already,
4986         * but just in case we are not, we fix it up here.
4987         */
4988        conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4989
4990        /* ack_receiver does not clean up anything. it must not interfere, either */
4991        drbd_thread_stop(&connection->ack_receiver);
4992        if (connection->ack_sender) {
4993                destroy_workqueue(connection->ack_sender);
4994                connection->ack_sender = NULL;
4995        }
4996        drbd_free_sock(connection);
4997
4998        rcu_read_lock();
4999        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5000                struct drbd_device *device = peer_device->device;
5001                kref_get(&device->kref);
5002                rcu_read_unlock();
5003                drbd_disconnected(peer_device);
5004                kref_put(&device->kref, drbd_destroy_device);
5005                rcu_read_lock();
5006        }
5007        rcu_read_unlock();
5008
5009        if (!list_empty(&connection->current_epoch->list))
5010                drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5011        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5012        atomic_set(&connection->current_epoch->epoch_size, 0);
5013        connection->send.seen_any_write_yet = false;
5014
5015        drbd_info(connection, "Connection closed\n");
5016
5017        if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5018                conn_try_outdate_peer_async(connection);
5019
5020        spin_lock_irq(&connection->resource->req_lock);
5021        oc = connection->cstate;
5022        if (oc >= C_UNCONNECTED)
5023                _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5024
5025        spin_unlock_irq(&connection->resource->req_lock);
5026
5027        if (oc == C_DISCONNECTING)
5028                conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5029}
5030
5031static int drbd_disconnected(struct drbd_peer_device *peer_device)
5032{
5033        struct drbd_device *device = peer_device->device;
5034        unsigned int i;
5035
5036        /* wait for current activity to cease. */
5037        spin_lock_irq(&device->resource->req_lock);
5038        _drbd_wait_ee_list_empty(device, &device->active_ee);
5039        _drbd_wait_ee_list_empty(device, &device->sync_ee);
5040        _drbd_wait_ee_list_empty(device, &device->read_ee);
5041        spin_unlock_irq(&device->resource->req_lock);
5042
5043        /* We do not have data structures that would allow us to
5044         * get the rs_pending_cnt down to 0 again.
5045         *  * On C_SYNC_TARGET we do not have any data structures describing
5046         *    the pending RSDataRequest's we have sent.
5047         *  * On C_SYNC_SOURCE there is no data structure that tracks
5048         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5049         *  And no, it is not the sum of the reference counts in the
5050         *  resync_LRU. The resync_LRU tracks the whole operation including
5051         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5052         *  on the fly. */
5053        drbd_rs_cancel_all(device);
5054        device->rs_total = 0;
5055        device->rs_failed = 0;
5056        atomic_set(&device->rs_pending_cnt, 0);
5057        wake_up(&device->misc_wait);
5058
5059        del_timer_sync(&device->resync_timer);
5060        resync_timer_fn(&device->resync_timer);
5061
5062        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5063         * w_make_resync_request etc. which may still be on the worker queue
5064         * to be "canceled" */
5065        drbd_flush_workqueue(&peer_device->connection->sender_work);
5066
5067        drbd_finish_peer_reqs(device);
5068
5069        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5070           might have issued a work again. The one before drbd_finish_peer_reqs() is
5071           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5072        drbd_flush_workqueue(&peer_device->connection->sender_work);
5073
5074        /* need to do it again, drbd_finish_peer_reqs() may have populated it
5075         * again via drbd_try_clear_on_disk_bm(). */
5076        drbd_rs_cancel_all(device);
5077
5078        kfree(device->p_uuid);
5079        device->p_uuid = NULL;
5080
5081        if (!drbd_suspended(device))
5082                tl_clear(peer_device->connection);
5083
5084        drbd_md_sync(device);
5085
5086        if (get_ldev(device)) {
5087                drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5088                                "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5089                put_ldev(device);
5090        }
5091
5092        /* tcp_close and release of sendpage pages can be deferred.  I don't
5093         * want to use SO_LINGER, because apparently it can be deferred for
5094         * more than 20 seconds (longest time I checked).
5095         *
5096         * Actually we don't care for exactly when the network stack does its
5097         * put_page(), but release our reference on these pages right here.
5098         */
5099        i = drbd_free_peer_reqs(device, &device->net_ee);
5100        if (i)
5101                drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5102        i = atomic_read(&device->pp_in_use_by_net);
5103        if (i)
5104                drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5105        i = atomic_read(&device->pp_in_use);
5106        if (i)
5107                drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5108
5109        D_ASSERT(device, list_empty(&device->read_ee));
5110        D_ASSERT(device, list_empty(&device->active_ee));
5111        D_ASSERT(device, list_empty(&device->sync_ee));
5112        D_ASSERT(device, list_empty(&device->done_ee));
5113
5114        return 0;
5115}
5116
5117/*
5118 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5119 * we can agree on is stored in agreed_pro_version.
5120 *
5121 * feature flags and the reserved array should be enough room for future
5122 * enhancements of the handshake protocol, and possible plugins...
5123 *
5124 * for now, they are expected to be zero, but ignored.
5125 */
5126static int drbd_send_features(struct drbd_connection *connection)
5127{
5128        struct drbd_socket *sock;
5129        struct p_connection_features *p;
5130
5131        sock = &connection->data;
5132        p = conn_prepare_command(connection, sock);
5133        if (!p)
5134                return -EIO;
5135        memset(p, 0, sizeof(*p));
5136        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5137        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5138        p->feature_flags = cpu_to_be32(PRO_FEATURES);
5139        return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5140}
5141
5142/*
5143 * return values:
5144 *   1 yes, we have a valid connection
5145 *   0 oops, did not work out, please try again
5146 *  -1 peer talks different language,
5147 *     no point in trying again, please go standalone.
5148 */
5149static int drbd_do_features(struct drbd_connection *connection)
5150{
5151        /* ASSERT current == connection->receiver ... */
5152        struct p_connection_features *p;
5153        const int expect = sizeof(struct p_connection_features);
5154        struct packet_info pi;
5155        int err;
5156
5157        err = drbd_send_features(connection);
5158        if (err)
5159                return 0;
5160
5161        err = drbd_recv_header(connection, &pi);
5162        if (err)
5163                return 0;
5164
5165        if (pi.cmd != P_CONNECTION_FEATURES) {
5166                drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5167                         cmdname(pi.cmd), pi.cmd);
5168                return -1;
5169        }
5170
5171        if (pi.size != expect) {
5172                drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5173                     expect, pi.size);
5174                return -1;
5175        }
5176
5177        p = pi.data;
5178        err = drbd_recv_all_warn(connection, p, expect);
5179        if (err)
5180                return 0;
5181
5182        p->protocol_min = be32_to_cpu(p->protocol_min);
5183        p->protocol_max = be32_to_cpu(p->protocol_max);
5184        if (p->protocol_max == 0)
5185                p->protocol_max = p->protocol_min;
5186
5187        if (PRO_VERSION_MAX < p->protocol_min ||
5188            PRO_VERSION_MIN > p->protocol_max)
5189                goto incompat;
5190
5191        connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5192        connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5193
5194        drbd_info(connection, "Handshake successful: "
5195             "Agreed network protocol version %d\n", connection->agreed_pro_version);
5196
5197        drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5198                  connection->agreed_features,
5199                  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5200                  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5201                  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5202                  connection->agreed_features ? "" : " none");
5203
5204        return 1;
5205
5206 incompat:
5207        drbd_err(connection, "incompatible DRBD dialects: "
5208            "I support %d-%d, peer supports %d-%d\n",
5209            PRO_VERSION_MIN, PRO_VERSION_MAX,
5210            p->protocol_min, p->protocol_max);
5211        return -1;
5212}
5213
5214#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5215static int drbd_do_auth(struct drbd_connection *connection)
5216{
5217        drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5218        drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5219        return -1;
5220}
5221#else
5222#define CHALLENGE_LEN 64
5223
5224/* Return value:
5225        1 - auth succeeded,
5226        0 - failed, try again (network error),
5227        -1 - auth failed, don't try again.
5228*/
5229
5230static int drbd_do_auth(struct drbd_connection *connection)
5231{
5232        struct drbd_socket *sock;
5233        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5234        char *response = NULL;
5235        char *right_response = NULL;
5236        char *peers_ch = NULL;
5237        unsigned int key_len;
5238        char secret[SHARED_SECRET_MAX]; /* 64 byte */
5239        unsigned int resp_size;
5240        SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5241        struct packet_info pi;
5242        struct net_conf *nc;
5243        int err, rv;
5244
5245        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5246
5247        rcu_read_lock();
5248        nc = rcu_dereference(connection->net_conf);
5249        key_len = strlen(nc->shared_secret);
5250        memcpy(secret, nc->shared_secret, key_len);
5251        rcu_read_unlock();
5252
5253        desc->tfm = connection->cram_hmac_tfm;
5254        desc->flags = 0;
5255
5256        rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5257        if (rv) {
5258                drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5259                rv = -1;
5260                goto fail;
5261        }
5262
5263        get_random_bytes(my_challenge, CHALLENGE_LEN);
5264
5265        sock = &connection->data;
5266        if (!conn_prepare_command(connection, sock)) {
5267                rv = 0;
5268                goto fail;
5269        }
5270        rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5271                                my_challenge, CHALLENGE_LEN);
5272        if (!rv)
5273                goto fail;
5274
5275        err = drbd_recv_header(connection, &pi);
5276        if (err) {
5277                rv = 0;
5278                goto fail;
5279        }
5280
5281        if (pi.cmd != P_AUTH_CHALLENGE) {
5282                drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5283                         cmdname(pi.cmd), pi.cmd);
5284                rv = 0;
5285                goto fail;
5286        }
5287
5288        if (pi.size > CHALLENGE_LEN * 2) {
5289                drbd_err(connection, "expected AuthChallenge payload too big.\n");
5290                rv = -1;
5291                goto fail;
5292        }
5293
5294        if (pi.size < CHALLENGE_LEN) {
5295                drbd_err(connection, "AuthChallenge payload too small.\n");
5296                rv = -1;
5297                goto fail;
5298        }
5299
5300        peers_ch = kmalloc(pi.size, GFP_NOIO);
5301        if (peers_ch == NULL) {
5302                drbd_err(connection, "kmalloc of peers_ch failed\n");
5303                rv = -1;
5304                goto fail;
5305        }
5306
5307        err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5308        if (err) {
5309                rv = 0;
5310                goto fail;
5311        }
5312
5313        if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5314                drbd_err(connection, "Peer presented the same challenge!\n");
5315                rv = -1;
5316                goto fail;
5317        }
5318
5319        resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5320        response = kmalloc(resp_size, GFP_NOIO);
5321        if (response == NULL) {
5322                drbd_err(connection, "kmalloc of response failed\n");
5323                rv = -1;
5324                goto fail;
5325        }
5326
5327        rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5328        if (rv) {
5329                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5330                rv = -1;
5331                goto fail;
5332        }
5333
5334        if (!conn_prepare_command(connection, sock)) {
5335                rv = 0;
5336                goto fail;
5337        }
5338        rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5339                                response, resp_size);
5340        if (!rv)
5341                goto fail;
5342
5343        err = drbd_recv_header(connection, &pi);
5344        if (err) {
5345                rv = 0;
5346                goto fail;
5347        }
5348
5349        if (pi.cmd != P_AUTH_RESPONSE) {
5350                drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5351                         cmdname(pi.cmd), pi.cmd);
5352                rv = 0;
5353                goto fail;
5354        }
5355
5356        if (pi.size != resp_size) {
5357                drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5358                rv = 0;
5359                goto fail;
5360        }
5361
5362        err = drbd_recv_all_warn(connection, response , resp_size);
5363        if (err) {
5364                rv = 0;
5365                goto fail;
5366        }
5367
5368        right_response = kmalloc(resp_size, GFP_NOIO);
5369        if (right_response == NULL) {
5370                drbd_err(connection, "kmalloc of right_response failed\n");
5371                rv = -1;
5372                goto fail;
5373        }
5374
5375        rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5376                                 right_response);
5377        if (rv) {
5378                drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5379                rv = -1;
5380                goto fail;
5381        }
5382
5383        rv = !memcmp(response, right_response, resp_size);
5384
5385        if (rv)
5386                drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5387                     resp_size);
5388        else
5389                rv = -1;
5390
5391 fail:
5392        kfree(peers_ch);
5393        kfree(response);
5394        kfree(right_response);
5395        shash_desc_zero(desc);
5396
5397        return rv;
5398}
5399#endif
5400
5401int drbd_receiver(struct drbd_thread *thi)
5402{
5403        struct drbd_connection *connection = thi->connection;
5404        int h;
5405
5406        drbd_info(connection, "receiver (re)started\n");
5407
5408        do {
5409                h = conn_connect(connection);
5410                if (h == 0) {
5411                        conn_disconnect(connection);
5412                        schedule_timeout_interruptible(HZ);
5413                }
5414                if (h == -1) {
5415                        drbd_warn(connection, "Discarding network configuration.\n");
5416                        conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5417                }
5418        } while (h == 0);
5419
5420        if (h > 0) {
5421                blk_start_plug(&connection->receiver_plug);
5422                drbdd(connection);
5423                blk_finish_plug(&connection->receiver_plug);
5424        }
5425
5426        conn_disconnect(connection);
5427
5428        drbd_info(connection, "receiver terminated\n");
5429        return 0;
5430}
5431
5432/* ********* acknowledge sender ******** */
5433
5434static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5435{
5436        struct p_req_state_reply *p = pi->data;
5437        int retcode = be32_to_cpu(p->retcode);
5438
5439        if (retcode >= SS_SUCCESS) {
5440                set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5441        } else {
5442                set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5443                drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5444                         drbd_set_st_err_str(retcode), retcode);
5445        }
5446        wake_up(&connection->ping_wait);
5447
5448        return 0;
5449}
5450
5451static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5452{
5453        struct drbd_peer_device *peer_device;
5454        struct drbd_device *device;
5455        struct p_req_state_reply *p = pi->data;
5456        int retcode = be32_to_cpu(p->retcode);
5457
5458        peer_device = conn_peer_device(connection, pi->vnr);
5459        if (!peer_device)
5460                return -EIO;
5461        device = peer_device->device;
5462
5463        if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5464                D_ASSERT(device, connection->agreed_pro_version < 100);
5465                return got_conn_RqSReply(connection, pi);
5466        }
5467
5468        if (retcode >= SS_SUCCESS) {
5469                set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5470        } else {
5471                set_bit(CL_ST_CHG_FAIL, &device->flags);
5472                drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5473                        drbd_set_st_err_str(retcode), retcode);
5474        }
5475        wake_up(&device->state_wait);
5476
5477        return 0;
5478}
5479
5480static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5481{
5482        return drbd_send_ping_ack(connection);
5483
5484}
5485
5486static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5487{
5488        /* restore idle timeout */
5489        connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5490        if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5491                wake_up(&connection->ping_wait);
5492
5493        return 0;
5494}
5495
5496static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5497{
5498        struct drbd_peer_device *peer_device;
5499        struct drbd_device *device;
5500        struct p_block_ack *p = pi->data;
5501        sector_t sector = be64_to_cpu(p->sector);
5502        int blksize = be32_to_cpu(p->blksize);
5503
5504        peer_device = conn_peer_device(connection, pi->vnr);
5505        if (!peer_device)
5506                return -EIO;
5507        device = peer_device->device;
5508
5509        D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5510
5511        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5512
5513        if (get_ldev(device)) {
5514                drbd_rs_complete_io(device, sector);
5515                drbd_set_in_sync(device, sector, blksize);
5516                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5517                device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5518                put_ldev(device);
5519        }
5520        dec_rs_pending(device);
5521        atomic_add(blksize >> 9, &device->rs_sect_in);
5522
5523        return 0;
5524}
5525
5526static int
5527validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5528                              struct rb_root *root, const char *func,
5529                              enum drbd_req_event what, bool missing_ok)
5530{
5531        struct drbd_request *req;
5532        struct bio_and_error m;
5533
5534        spin_lock_irq(&device->resource->req_lock);
5535        req = find_request(device, root, id, sector, missing_ok, func);
5536        if (unlikely(!req)) {
5537                spin_unlock_irq(&device->resource->req_lock);
5538                return -EIO;
5539        }
5540        __req_mod(req, what, &m);
5541        spin_unlock_irq(&device->resource->req_lock);
5542
5543        if (m.bio)
5544                complete_master_bio(device, &m);
5545        return 0;
5546}
5547
5548static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5549{
5550        struct drbd_peer_device *peer_device;
5551        struct drbd_device *device;
5552        struct p_block_ack *p = pi->data;
5553        sector_t sector = be64_to_cpu(p->sector);
5554        int blksize = be32_to_cpu(p->blksize);
5555        enum drbd_req_event what;
5556
5557        peer_device = conn_peer_device(connection, pi->vnr);
5558        if (!peer_device)
5559                return -EIO;
5560        device = peer_device->device;
5561
5562        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5563
5564        if (p->block_id == ID_SYNCER) {
5565                drbd_set_in_sync(device, sector, blksize);
5566                dec_rs_pending(device);
5567                return 0;
5568        }
5569        switch (pi->cmd) {
5570        case P_RS_WRITE_ACK:
5571                what = WRITE_ACKED_BY_PEER_AND_SIS;
5572                break;
5573        case P_WRITE_ACK:
5574                what = WRITE_ACKED_BY_PEER;
5575                break;
5576        case P_RECV_ACK:
5577                what = RECV_ACKED_BY_PEER;
5578                break;
5579        case P_SUPERSEDED:
5580                what = CONFLICT_RESOLVED;
5581                break;
5582        case P_RETRY_WRITE:
5583                what = POSTPONE_WRITE;
5584                break;
5585        default:
5586                BUG();
5587        }
5588
5589        return validate_req_change_req_state(device, p->block_id, sector,
5590                                             &device->write_requests, __func__,
5591                                             what, false);
5592}
5593
5594static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5595{
5596        struct drbd_peer_device *peer_device;
5597        struct drbd_device *device;
5598        struct p_block_ack *p = pi->data;
5599        sector_t sector = be64_to_cpu(p->sector);
5600        int size = be32_to_cpu(p->blksize);
5601        int err;
5602
5603        peer_device = conn_peer_device(connection, pi->vnr);
5604        if (!peer_device)
5605                return -EIO;
5606        device = peer_device->device;
5607
5608        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5609
5610        if (p->block_id == ID_SYNCER) {
5611                dec_rs_pending(device);
5612                drbd_rs_failed_io(device, sector, size);
5613                return 0;
5614        }
5615
5616        err = validate_req_change_req_state(device, p->block_id, sector,
5617                                            &device->write_requests, __func__,
5618                                            NEG_ACKED, true);
5619        if (err) {
5620                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5621                   The master bio might already be completed, therefore the
5622                   request is no longer in the collision hash. */
5623                /* In Protocol B we might already have got a P_RECV_ACK
5624                   but then get a P_NEG_ACK afterwards. */
5625                drbd_set_out_of_sync(device, sector, size);
5626        }
5627        return 0;
5628}
5629
5630static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5631{
5632        struct drbd_peer_device *peer_device;
5633        struct drbd_device *device;
5634        struct p_block_ack *p = pi->data;
5635        sector_t sector = be64_to_cpu(p->sector);
5636
5637        peer_device = conn_peer_device(connection, pi->vnr);
5638        if (!peer_device)
5639                return -EIO;
5640        device = peer_device->device;
5641
5642        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5643
5644        drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5645            (unsigned long long)sector, be32_to_cpu(p->blksize));
5646
5647        return validate_req_change_req_state(device, p->block_id, sector,
5648                                             &device->read_requests, __func__,
5649                                             NEG_ACKED, false);
5650}
5651
5652static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5653{
5654        struct drbd_peer_device *peer_device;
5655        struct drbd_device *device;
5656        sector_t sector;
5657        int size;
5658        struct p_block_ack *p = pi->data;
5659
5660        peer_device = conn_peer_device(connection, pi->vnr);
5661        if (!peer_device)
5662                return -EIO;
5663        device = peer_device->device;
5664
5665        sector = be64_to_cpu(p->sector);
5666        size = be32_to_cpu(p->blksize);
5667
5668        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5669
5670        dec_rs_pending(device);
5671
5672        if (get_ldev_if_state(device, D_FAILED)) {
5673                drbd_rs_complete_io(device, sector);
5674                switch (pi->cmd) {
5675                case P_NEG_RS_DREPLY:
5676                        drbd_rs_failed_io(device, sector, size);
5677                case P_RS_CANCEL:
5678                        break;
5679                default:
5680                        BUG();
5681                }
5682                put_ldev(device);
5683        }
5684
5685        return 0;
5686}
5687
5688static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5689{
5690        struct p_barrier_ack *p = pi->data;
5691        struct drbd_peer_device *peer_device;
5692        int vnr;
5693
5694        tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5695
5696        rcu_read_lock();
5697        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5698                struct drbd_device *device = peer_device->device;
5699
5700                if (device->state.conn == C_AHEAD &&
5701                    atomic_read(&device->ap_in_flight) == 0 &&
5702                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5703                        device->start_resync_timer.expires = jiffies + HZ;
5704                        add_timer(&device->start_resync_timer);
5705                }
5706        }
5707        rcu_read_unlock();
5708
5709        return 0;
5710}
5711
5712static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5713{
5714        struct drbd_peer_device *peer_device;
5715        struct drbd_device *device;
5716        struct p_block_ack *p = pi->data;
5717        struct drbd_device_work *dw;
5718        sector_t sector;
5719        int size;
5720
5721        peer_device = conn_peer_device(connection, pi->vnr);
5722        if (!peer_device)
5723                return -EIO;
5724        device = peer_device->device;
5725
5726        sector = be64_to_cpu(p->sector);
5727        size = be32_to_cpu(p->blksize);
5728
5729        update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5730
5731        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5732                drbd_ov_out_of_sync_found(device, sector, size);
5733        else
5734                ov_out_of_sync_print(device);
5735
5736        if (!get_ldev(device))
5737                return 0;
5738
5739        drbd_rs_complete_io(device, sector);
5740        dec_rs_pending(device);
5741
5742        --device->ov_left;
5743
5744        /* let's advance progress step marks only for every other megabyte */
5745        if ((device->ov_left & 0x200) == 0x200)
5746                drbd_advance_rs_marks(device, device->ov_left);
5747
5748        if (device->ov_left == 0) {
5749                dw = kmalloc(sizeof(*dw), GFP_NOIO);
5750                if (dw) {
5751                        dw->w.cb = w_ov_finished;
5752                        dw->device = device;
5753                        drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5754                } else {
5755                        drbd_err(device, "kmalloc(dw) failed.");
5756                        ov_out_of_sync_print(device);
5757                        drbd_resync_finished(device);
5758                }
5759        }
5760        put_ldev(device);
5761        return 0;
5762}
5763
5764static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5765{
5766        return 0;
5767}
5768
5769struct meta_sock_cmd {
5770        size_t pkt_size;
5771        int (*fn)(struct drbd_connection *connection, struct packet_info *);
5772};
5773
5774static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5775{
5776        long t;
5777        struct net_conf *nc;
5778
5779        rcu_read_lock();
5780        nc = rcu_dereference(connection->net_conf);
5781        t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5782        rcu_read_unlock();
5783
5784        t *= HZ;
5785        if (ping_timeout)
5786                t /= 10;
5787
5788        connection->meta.socket->sk->sk_rcvtimeo = t;
5789}
5790
5791static void set_ping_timeout(struct drbd_connection *connection)
5792{
5793        set_rcvtimeo(connection, 1);
5794}
5795
5796static void set_idle_timeout(struct drbd_connection *connection)
5797{
5798        set_rcvtimeo(connection, 0);
5799}
5800
5801static struct meta_sock_cmd ack_receiver_tbl[] = {
5802        [P_PING]            = { 0, got_Ping },
5803        [P_PING_ACK]        = { 0, got_PingAck },
5804        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5805        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5806        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5807        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5808        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5809        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5810        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5811        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5812        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5813        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5814        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5815        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5816        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5817        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5818        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5819};
5820
5821int drbd_ack_receiver(struct drbd_thread *thi)
5822{
5823        struct drbd_connection *connection = thi->connection;
5824        struct meta_sock_cmd *cmd = NULL;
5825        struct packet_info pi;
5826        unsigned long pre_recv_jif;
5827        int rv;
5828        void *buf    = connection->meta.rbuf;
5829        int received = 0;
5830        unsigned int header_size = drbd_header_size(connection);
5831        int expect   = header_size;
5832        bool ping_timeout_active = false;
5833        struct sched_param param = { .sched_priority = 2 };
5834
5835        rv = sched_setscheduler(current, SCHED_RR, &param);
5836        if (rv < 0)
5837                drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5838
5839        while (get_t_state(thi) == RUNNING) {
5840                drbd_thread_current_set_cpu(thi);
5841
5842                conn_reclaim_net_peer_reqs(connection);
5843
5844                if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5845                        if (drbd_send_ping(connection)) {
5846                                drbd_err(connection, "drbd_send_ping has failed\n");
5847                                goto reconnect;
5848                        }
5849                        set_ping_timeout(connection);
5850                        ping_timeout_active = true;
5851                }
5852
5853                pre_recv_jif = jiffies;
5854                rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5855
5856                /* Note:
5857                 * -EINTR        (on meta) we got a signal
5858                 * -EAGAIN       (on meta) rcvtimeo expired
5859                 * -ECONNRESET   other side closed the connection
5860                 * -ERESTARTSYS  (on data) we got a signal
5861                 * rv <  0       other than above: unexpected error!
5862                 * rv == expected: full header or command
5863                 * rv <  expected: "woken" by signal during receive
5864                 * rv == 0       : "connection shut down by peer"
5865                 */
5866                if (likely(rv > 0)) {
5867                        received += rv;
5868                        buf      += rv;
5869                } else if (rv == 0) {
5870                        if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5871                                long t;
5872                                rcu_read_lock();
5873                                t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5874                                rcu_read_unlock();
5875
5876                                t = wait_event_timeout(connection->ping_wait,
5877                                                       connection->cstate < C_WF_REPORT_PARAMS,
5878                                                       t);
5879                                if (t)
5880                                        break;
5881                        }
5882                        drbd_err(connection, "meta connection shut down by peer.\n");
5883                        goto reconnect;
5884                } else if (rv == -EAGAIN) {
5885                        /* If the data socket received something meanwhile,
5886                         * that is good enough: peer is still alive. */
5887                        if (time_after(connection->last_received, pre_recv_jif))
5888                                continue;
5889                        if (ping_timeout_active) {
5890                                drbd_err(connection, "PingAck did not arrive in time.\n");
5891                                goto reconnect;
5892                        }
5893                        set_bit(SEND_PING, &connection->flags);
5894                        continue;
5895                } else if (rv == -EINTR) {
5896                        /* maybe drbd_thread_stop(): the while condition will notice.
5897                         * maybe woken for send_ping: we'll send a ping above,
5898                         * and change the rcvtimeo */
5899                        flush_signals(current);
5900                        continue;
5901                } else {
5902                        drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5903                        goto reconnect;
5904                }
5905
5906                if (received == expect && cmd == NULL) {
5907                        if (decode_header(connection, connection->meta.rbuf, &pi))
5908                                goto reconnect;
5909                        cmd = &ack_receiver_tbl[pi.cmd];
5910                        if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5911                                drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5912                                         cmdname(pi.cmd), pi.cmd);
5913                                goto disconnect;
5914                        }
5915                        expect = header_size + cmd->pkt_size;
5916                        if (pi.size != expect - header_size) {
5917                                drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5918                                        pi.cmd, pi.size);
5919                                goto reconnect;
5920                        }
5921                }
5922                if (received == expect) {
5923                        bool err;
5924
5925                        err = cmd->fn(connection, &pi);
5926                        if (err) {
5927                                drbd_err(connection, "%pf failed\n", cmd->fn);
5928                                goto reconnect;
5929                        }
5930
5931                        connection->last_received = jiffies;
5932
5933                        if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5934                                set_idle_timeout(connection);
5935                                ping_timeout_active = false;
5936                        }
5937
5938                        buf      = connection->meta.rbuf;
5939                        received = 0;
5940                        expect   = header_size;
5941                        cmd      = NULL;
5942                }
5943        }
5944
5945        if (0) {
5946reconnect:
5947                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5948                conn_md_sync(connection);
5949        }
5950        if (0) {
5951disconnect:
5952                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5953        }
5954
5955        drbd_info(connection, "ack_receiver terminated\n");
5956
5957        return 0;
5958}
5959
5960void drbd_send_acks_wf(struct work_struct *ws)
5961{
5962        struct drbd_peer_device *peer_device =
5963                container_of(ws, struct drbd_peer_device, send_acks_work);
5964        struct drbd_connection *connection = peer_device->connection;
5965        struct drbd_device *device = peer_device->device;
5966        struct net_conf *nc;
5967        int tcp_cork, err;
5968
5969        rcu_read_lock();
5970        nc = rcu_dereference(connection->net_conf);
5971        tcp_cork = nc->tcp_cork;
5972        rcu_read_unlock();
5973
5974        if (tcp_cork)
5975                drbd_tcp_cork(connection->meta.socket);
5976
5977        err = drbd_finish_peer_reqs(device);
5978        kref_put(&device->kref, drbd_destroy_device);
5979        /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5980           struct work_struct send_acks_work alive, which is in the peer_device object */
5981
5982        if (err) {
5983                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5984                return;
5985        }
5986
5987        if (tcp_cork)
5988                drbd_tcp_uncork(connection->meta.socket);
5989
5990        return;
5991}
5992