linux/drivers/block/drbd/drbd_main.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3   drbd.c
   4
   5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   6
   7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  10
  11   Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
  12   from Logicworks, Inc. for making SDP replication support possible.
  13
  14
  15 */
  16
  17#define pr_fmt(fmt)     KBUILD_MODNAME ": " fmt
  18
  19#include <linux/module.h>
  20#include <linux/jiffies.h>
  21#include <linux/drbd.h>
  22#include <linux/uaccess.h>
  23#include <asm/types.h>
  24#include <net/sock.h>
  25#include <linux/ctype.h>
  26#include <linux/mutex.h>
  27#include <linux/fs.h>
  28#include <linux/file.h>
  29#include <linux/proc_fs.h>
  30#include <linux/init.h>
  31#include <linux/mm.h>
  32#include <linux/memcontrol.h>
  33#include <linux/mm_inline.h>
  34#include <linux/slab.h>
  35#include <linux/random.h>
  36#include <linux/reboot.h>
  37#include <linux/notifier.h>
  38#include <linux/kthread.h>
  39#include <linux/workqueue.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/sched/signal.h>
  44
  45#include <linux/drbd_limits.h>
  46#include "drbd_int.h"
  47#include "drbd_protocol.h"
  48#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
  49#include "drbd_vli.h"
  50#include "drbd_debugfs.h"
  51
  52static DEFINE_MUTEX(drbd_main_mutex);
  53static int drbd_open(struct block_device *bdev, fmode_t mode);
  54static void drbd_release(struct gendisk *gd, fmode_t mode);
  55static void md_sync_timer_fn(struct timer_list *t);
  56static int w_bitmap_io(struct drbd_work *w, int unused);
  57
  58MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
  59              "Lars Ellenberg <lars@linbit.com>");
  60MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
  61MODULE_VERSION(REL_VERSION);
  62MODULE_LICENSE("GPL");
  63MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
  64                 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
  65MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
  66
  67#include <linux/moduleparam.h>
  68/* thanks to these macros, if compiled into the kernel (not-module),
  69 * these become boot parameters (e.g., drbd.minor_count) */
  70
  71#ifdef CONFIG_DRBD_FAULT_INJECTION
  72int drbd_enable_faults;
  73int drbd_fault_rate;
  74static int drbd_fault_count;
  75static int drbd_fault_devs;
  76/* bitmap of enabled faults */
  77module_param_named(enable_faults, drbd_enable_faults, int, 0664);
  78/* fault rate % value - applies to all enabled faults */
  79module_param_named(fault_rate, drbd_fault_rate, int, 0664);
  80/* count of faults inserted */
  81module_param_named(fault_count, drbd_fault_count, int, 0664);
  82/* bitmap of devices to insert faults on */
  83module_param_named(fault_devs, drbd_fault_devs, int, 0644);
  84#endif
  85
  86/* module parameters we can keep static */
  87static bool drbd_allow_oos; /* allow_open_on_secondary */
  88static bool drbd_disable_sendpage;
  89MODULE_PARM_DESC(allow_oos, "DONT USE!");
  90module_param_named(allow_oos, drbd_allow_oos, bool, 0);
  91module_param_named(disable_sendpage, drbd_disable_sendpage, bool, 0644);
  92
  93/* module parameters we share */
  94int drbd_proc_details; /* Detail level in proc drbd*/
  95module_param_named(proc_details, drbd_proc_details, int, 0644);
  96/* module parameters shared with defaults */
  97unsigned int drbd_minor_count = DRBD_MINOR_COUNT_DEF;
  98/* Module parameter for setting the user mode helper program
  99 * to run. Default is /sbin/drbdadm */
 100char drbd_usermode_helper[80] = "/sbin/drbdadm";
 101module_param_named(minor_count, drbd_minor_count, uint, 0444);
 102module_param_string(usermode_helper, drbd_usermode_helper, sizeof(drbd_usermode_helper), 0644);
 103
 104/* in 2.6.x, our device mapping and config info contains our virtual gendisks
 105 * as member "struct gendisk *vdisk;"
 106 */
 107struct idr drbd_devices;
 108struct list_head drbd_resources;
 109struct mutex resources_mutex;
 110
 111struct kmem_cache *drbd_request_cache;
 112struct kmem_cache *drbd_ee_cache;       /* peer requests */
 113struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
 114struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
 115mempool_t drbd_request_mempool;
 116mempool_t drbd_ee_mempool;
 117mempool_t drbd_md_io_page_pool;
 118struct bio_set drbd_md_io_bio_set;
 119struct bio_set drbd_io_bio_set;
 120
 121/* I do not use a standard mempool, because:
 122   1) I want to hand out the pre-allocated objects first.
 123   2) I want to be able to interrupt sleeping allocation with a signal.
 124   Note: This is a single linked list, the next pointer is the private
 125         member of struct page.
 126 */
 127struct page *drbd_pp_pool;
 128spinlock_t   drbd_pp_lock;
 129int          drbd_pp_vacant;
 130wait_queue_head_t drbd_pp_wait;
 131
 132DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
 133
 134static const struct block_device_operations drbd_ops = {
 135        .owner =   THIS_MODULE,
 136        .open =    drbd_open,
 137        .release = drbd_release,
 138};
 139
 140struct bio *bio_alloc_drbd(gfp_t gfp_mask)
 141{
 142        struct bio *bio;
 143
 144        if (!bioset_initialized(&drbd_md_io_bio_set))
 145                return bio_alloc(gfp_mask, 1);
 146
 147        bio = bio_alloc_bioset(gfp_mask, 1, &drbd_md_io_bio_set);
 148        if (!bio)
 149                return NULL;
 150        return bio;
 151}
 152
 153#ifdef __CHECKER__
 154/* When checking with sparse, and this is an inline function, sparse will
 155   give tons of false positives. When this is a real functions sparse works.
 156 */
 157int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
 158{
 159        int io_allowed;
 160
 161        atomic_inc(&device->local_cnt);
 162        io_allowed = (device->state.disk >= mins);
 163        if (!io_allowed) {
 164                if (atomic_dec_and_test(&device->local_cnt))
 165                        wake_up(&device->misc_wait);
 166        }
 167        return io_allowed;
 168}
 169
 170#endif
 171
 172/**
 173 * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
 174 * @connection: DRBD connection.
 175 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
 176 * @set_size:   Expected number of requests before that barrier.
 177 *
 178 * In case the passed barrier_nr or set_size does not match the oldest
 179 * epoch of not yet barrier-acked requests, this function will cause a
 180 * termination of the connection.
 181 */
 182void tl_release(struct drbd_connection *connection, unsigned int barrier_nr,
 183                unsigned int set_size)
 184{
 185        struct drbd_request *r;
 186        struct drbd_request *req = NULL;
 187        int expect_epoch = 0;
 188        int expect_size = 0;
 189
 190        spin_lock_irq(&connection->resource->req_lock);
 191
 192        /* find oldest not yet barrier-acked write request,
 193         * count writes in its epoch. */
 194        list_for_each_entry(r, &connection->transfer_log, tl_requests) {
 195                const unsigned s = r->rq_state;
 196                if (!req) {
 197                        if (!(s & RQ_WRITE))
 198                                continue;
 199                        if (!(s & RQ_NET_MASK))
 200                                continue;
 201                        if (s & RQ_NET_DONE)
 202                                continue;
 203                        req = r;
 204                        expect_epoch = req->epoch;
 205                        expect_size ++;
 206                } else {
 207                        if (r->epoch != expect_epoch)
 208                                break;
 209                        if (!(s & RQ_WRITE))
 210                                continue;
 211                        /* if (s & RQ_DONE): not expected */
 212                        /* if (!(s & RQ_NET_MASK)): not expected */
 213                        expect_size++;
 214                }
 215        }
 216
 217        /* first some paranoia code */
 218        if (req == NULL) {
 219                drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
 220                         barrier_nr);
 221                goto bail;
 222        }
 223        if (expect_epoch != barrier_nr) {
 224                drbd_err(connection, "BAD! BarrierAck #%u received, expected #%u!\n",
 225                         barrier_nr, expect_epoch);
 226                goto bail;
 227        }
 228
 229        if (expect_size != set_size) {
 230                drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
 231                         barrier_nr, set_size, expect_size);
 232                goto bail;
 233        }
 234
 235        /* Clean up list of requests processed during current epoch. */
 236        /* this extra list walk restart is paranoia,
 237         * to catch requests being barrier-acked "unexpectedly".
 238         * It usually should find the same req again, or some READ preceding it. */
 239        list_for_each_entry(req, &connection->transfer_log, tl_requests)
 240                if (req->epoch == expect_epoch)
 241                        break;
 242        list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) {
 243                if (req->epoch != expect_epoch)
 244                        break;
 245                _req_mod(req, BARRIER_ACKED);
 246        }
 247        spin_unlock_irq(&connection->resource->req_lock);
 248
 249        return;
 250
 251bail:
 252        spin_unlock_irq(&connection->resource->req_lock);
 253        conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
 254}
 255
 256
 257/**
 258 * _tl_restart() - Walks the transfer log, and applies an action to all requests
 259 * @connection: DRBD connection to operate on.
 260 * @what:       The action/event to perform with all request objects
 261 *
 262 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
 263 * RESTART_FROZEN_DISK_IO.
 264 */
 265/* must hold resource->req_lock */
 266void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
 267{
 268        struct drbd_request *req, *r;
 269
 270        list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests)
 271                _req_mod(req, what);
 272}
 273
 274void tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
 275{
 276        spin_lock_irq(&connection->resource->req_lock);
 277        _tl_restart(connection, what);
 278        spin_unlock_irq(&connection->resource->req_lock);
 279}
 280
 281/**
 282 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
 283 * @device:     DRBD device.
 284 *
 285 * This is called after the connection to the peer was lost. The storage covered
 286 * by the requests on the transfer gets marked as our of sync. Called from the
 287 * receiver thread and the worker thread.
 288 */
 289void tl_clear(struct drbd_connection *connection)
 290{
 291        tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
 292}
 293
 294/**
 295 * tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL
 296 * @device:     DRBD device.
 297 */
 298void tl_abort_disk_io(struct drbd_device *device)
 299{
 300        struct drbd_connection *connection = first_peer_device(device)->connection;
 301        struct drbd_request *req, *r;
 302
 303        spin_lock_irq(&connection->resource->req_lock);
 304        list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
 305                if (!(req->rq_state & RQ_LOCAL_PENDING))
 306                        continue;
 307                if (req->device != device)
 308                        continue;
 309                _req_mod(req, ABORT_DISK_IO);
 310        }
 311        spin_unlock_irq(&connection->resource->req_lock);
 312}
 313
 314static int drbd_thread_setup(void *arg)
 315{
 316        struct drbd_thread *thi = (struct drbd_thread *) arg;
 317        struct drbd_resource *resource = thi->resource;
 318        unsigned long flags;
 319        int retval;
 320
 321        snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
 322                 thi->name[0],
 323                 resource->name);
 324
 325        allow_kernel_signal(DRBD_SIGKILL);
 326        allow_kernel_signal(SIGXCPU);
 327restart:
 328        retval = thi->function(thi);
 329
 330        spin_lock_irqsave(&thi->t_lock, flags);
 331
 332        /* if the receiver has been "EXITING", the last thing it did
 333         * was set the conn state to "StandAlone",
 334         * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
 335         * and receiver thread will be "started".
 336         * drbd_thread_start needs to set "RESTARTING" in that case.
 337         * t_state check and assignment needs to be within the same spinlock,
 338         * so either thread_start sees EXITING, and can remap to RESTARTING,
 339         * or thread_start see NONE, and can proceed as normal.
 340         */
 341
 342        if (thi->t_state == RESTARTING) {
 343                drbd_info(resource, "Restarting %s thread\n", thi->name);
 344                thi->t_state = RUNNING;
 345                spin_unlock_irqrestore(&thi->t_lock, flags);
 346                goto restart;
 347        }
 348
 349        thi->task = NULL;
 350        thi->t_state = NONE;
 351        smp_mb();
 352        complete_all(&thi->stop);
 353        spin_unlock_irqrestore(&thi->t_lock, flags);
 354
 355        drbd_info(resource, "Terminating %s\n", current->comm);
 356
 357        /* Release mod reference taken when thread was started */
 358
 359        if (thi->connection)
 360                kref_put(&thi->connection->kref, drbd_destroy_connection);
 361        kref_put(&resource->kref, drbd_destroy_resource);
 362        module_put(THIS_MODULE);
 363        return retval;
 364}
 365
 366static void drbd_thread_init(struct drbd_resource *resource, struct drbd_thread *thi,
 367                             int (*func) (struct drbd_thread *), const char *name)
 368{
 369        spin_lock_init(&thi->t_lock);
 370        thi->task    = NULL;
 371        thi->t_state = NONE;
 372        thi->function = func;
 373        thi->resource = resource;
 374        thi->connection = NULL;
 375        thi->name = name;
 376}
 377
 378int drbd_thread_start(struct drbd_thread *thi)
 379{
 380        struct drbd_resource *resource = thi->resource;
 381        struct task_struct *nt;
 382        unsigned long flags;
 383
 384        /* is used from state engine doing drbd_thread_stop_nowait,
 385         * while holding the req lock irqsave */
 386        spin_lock_irqsave(&thi->t_lock, flags);
 387
 388        switch (thi->t_state) {
 389        case NONE:
 390                drbd_info(resource, "Starting %s thread (from %s [%d])\n",
 391                         thi->name, current->comm, current->pid);
 392
 393                /* Get ref on module for thread - this is released when thread exits */
 394                if (!try_module_get(THIS_MODULE)) {
 395                        drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
 396                        spin_unlock_irqrestore(&thi->t_lock, flags);
 397                        return false;
 398                }
 399
 400                kref_get(&resource->kref);
 401                if (thi->connection)
 402                        kref_get(&thi->connection->kref);
 403
 404                init_completion(&thi->stop);
 405                thi->reset_cpu_mask = 1;
 406                thi->t_state = RUNNING;
 407                spin_unlock_irqrestore(&thi->t_lock, flags);
 408                flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
 409
 410                nt = kthread_create(drbd_thread_setup, (void *) thi,
 411                                    "drbd_%c_%s", thi->name[0], thi->resource->name);
 412
 413                if (IS_ERR(nt)) {
 414                        drbd_err(resource, "Couldn't start thread\n");
 415
 416                        if (thi->connection)
 417                                kref_put(&thi->connection->kref, drbd_destroy_connection);
 418                        kref_put(&resource->kref, drbd_destroy_resource);
 419                        module_put(THIS_MODULE);
 420                        return false;
 421                }
 422                spin_lock_irqsave(&thi->t_lock, flags);
 423                thi->task = nt;
 424                thi->t_state = RUNNING;
 425                spin_unlock_irqrestore(&thi->t_lock, flags);
 426                wake_up_process(nt);
 427                break;
 428        case EXITING:
 429                thi->t_state = RESTARTING;
 430                drbd_info(resource, "Restarting %s thread (from %s [%d])\n",
 431                                thi->name, current->comm, current->pid);
 432                /* fall through */
 433        case RUNNING:
 434        case RESTARTING:
 435        default:
 436                spin_unlock_irqrestore(&thi->t_lock, flags);
 437                break;
 438        }
 439
 440        return true;
 441}
 442
 443
 444void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
 445{
 446        unsigned long flags;
 447
 448        enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
 449
 450        /* may be called from state engine, holding the req lock irqsave */
 451        spin_lock_irqsave(&thi->t_lock, flags);
 452
 453        if (thi->t_state == NONE) {
 454                spin_unlock_irqrestore(&thi->t_lock, flags);
 455                if (restart)
 456                        drbd_thread_start(thi);
 457                return;
 458        }
 459
 460        if (thi->t_state != ns) {
 461                if (thi->task == NULL) {
 462                        spin_unlock_irqrestore(&thi->t_lock, flags);
 463                        return;
 464                }
 465
 466                thi->t_state = ns;
 467                smp_mb();
 468                init_completion(&thi->stop);
 469                if (thi->task != current)
 470                        send_sig(DRBD_SIGKILL, thi->task, 1);
 471        }
 472
 473        spin_unlock_irqrestore(&thi->t_lock, flags);
 474
 475        if (wait)
 476                wait_for_completion(&thi->stop);
 477}
 478
 479int conn_lowest_minor(struct drbd_connection *connection)
 480{
 481        struct drbd_peer_device *peer_device;
 482        int vnr = 0, minor = -1;
 483
 484        rcu_read_lock();
 485        peer_device = idr_get_next(&connection->peer_devices, &vnr);
 486        if (peer_device)
 487                minor = device_to_minor(peer_device->device);
 488        rcu_read_unlock();
 489
 490        return minor;
 491}
 492
 493#ifdef CONFIG_SMP
 494/**
 495 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
 496 *
 497 * Forces all threads of a resource onto the same CPU. This is beneficial for
 498 * DRBD's performance. May be overwritten by user's configuration.
 499 */
 500static void drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
 501{
 502        unsigned int *resources_per_cpu, min_index = ~0;
 503
 504        resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
 505                                    GFP_KERNEL);
 506        if (resources_per_cpu) {
 507                struct drbd_resource *resource;
 508                unsigned int cpu, min = ~0;
 509
 510                rcu_read_lock();
 511                for_each_resource_rcu(resource, &drbd_resources) {
 512                        for_each_cpu(cpu, resource->cpu_mask)
 513                                resources_per_cpu[cpu]++;
 514                }
 515                rcu_read_unlock();
 516                for_each_online_cpu(cpu) {
 517                        if (resources_per_cpu[cpu] < min) {
 518                                min = resources_per_cpu[cpu];
 519                                min_index = cpu;
 520                        }
 521                }
 522                kfree(resources_per_cpu);
 523        }
 524        if (min_index == ~0) {
 525                cpumask_setall(*cpu_mask);
 526                return;
 527        }
 528        cpumask_set_cpu(min_index, *cpu_mask);
 529}
 530
 531/**
 532 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
 533 * @device:     DRBD device.
 534 * @thi:        drbd_thread object
 535 *
 536 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
 537 * prematurely.
 538 */
 539void drbd_thread_current_set_cpu(struct drbd_thread *thi)
 540{
 541        struct drbd_resource *resource = thi->resource;
 542        struct task_struct *p = current;
 543
 544        if (!thi->reset_cpu_mask)
 545                return;
 546        thi->reset_cpu_mask = 0;
 547        set_cpus_allowed_ptr(p, resource->cpu_mask);
 548}
 549#else
 550#define drbd_calc_cpu_mask(A) ({})
 551#endif
 552
 553/**
 554 * drbd_header_size  -  size of a packet header
 555 *
 556 * The header size is a multiple of 8, so any payload following the header is
 557 * word aligned on 64-bit architectures.  (The bitmap send and receive code
 558 * relies on this.)
 559 */
 560unsigned int drbd_header_size(struct drbd_connection *connection)
 561{
 562        if (connection->agreed_pro_version >= 100) {
 563                BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
 564                return sizeof(struct p_header100);
 565        } else {
 566                BUILD_BUG_ON(sizeof(struct p_header80) !=
 567                             sizeof(struct p_header95));
 568                BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
 569                return sizeof(struct p_header80);
 570        }
 571}
 572
 573static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
 574{
 575        h->magic   = cpu_to_be32(DRBD_MAGIC);
 576        h->command = cpu_to_be16(cmd);
 577        h->length  = cpu_to_be16(size);
 578        return sizeof(struct p_header80);
 579}
 580
 581static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
 582{
 583        h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
 584        h->command = cpu_to_be16(cmd);
 585        h->length = cpu_to_be32(size);
 586        return sizeof(struct p_header95);
 587}
 588
 589static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
 590                                      int size, int vnr)
 591{
 592        h->magic = cpu_to_be32(DRBD_MAGIC_100);
 593        h->volume = cpu_to_be16(vnr);
 594        h->command = cpu_to_be16(cmd);
 595        h->length = cpu_to_be32(size);
 596        h->pad = 0;
 597        return sizeof(struct p_header100);
 598}
 599
 600static unsigned int prepare_header(struct drbd_connection *connection, int vnr,
 601                                   void *buffer, enum drbd_packet cmd, int size)
 602{
 603        if (connection->agreed_pro_version >= 100)
 604                return prepare_header100(buffer, cmd, size, vnr);
 605        else if (connection->agreed_pro_version >= 95 &&
 606                 size > DRBD_MAX_SIZE_H80_PACKET)
 607                return prepare_header95(buffer, cmd, size);
 608        else
 609                return prepare_header80(buffer, cmd, size);
 610}
 611
 612static void *__conn_prepare_command(struct drbd_connection *connection,
 613                                    struct drbd_socket *sock)
 614{
 615        if (!sock->socket)
 616                return NULL;
 617        return sock->sbuf + drbd_header_size(connection);
 618}
 619
 620void *conn_prepare_command(struct drbd_connection *connection, struct drbd_socket *sock)
 621{
 622        void *p;
 623
 624        mutex_lock(&sock->mutex);
 625        p = __conn_prepare_command(connection, sock);
 626        if (!p)
 627                mutex_unlock(&sock->mutex);
 628
 629        return p;
 630}
 631
 632void *drbd_prepare_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock)
 633{
 634        return conn_prepare_command(peer_device->connection, sock);
 635}
 636
 637static int __send_command(struct drbd_connection *connection, int vnr,
 638                          struct drbd_socket *sock, enum drbd_packet cmd,
 639                          unsigned int header_size, void *data,
 640                          unsigned int size)
 641{
 642        int msg_flags;
 643        int err;
 644
 645        /*
 646         * Called with @data == NULL and the size of the data blocks in @size
 647         * for commands that send data blocks.  For those commands, omit the
 648         * MSG_MORE flag: this will increase the likelihood that data blocks
 649         * which are page aligned on the sender will end up page aligned on the
 650         * receiver.
 651         */
 652        msg_flags = data ? MSG_MORE : 0;
 653
 654        header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
 655                                      header_size + size);
 656        err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
 657                            msg_flags);
 658        if (data && !err)
 659                err = drbd_send_all(connection, sock->socket, data, size, 0);
 660        /* DRBD protocol "pings" are latency critical.
 661         * This is supposed to trigger tcp_push_pending_frames() */
 662        if (!err && (cmd == P_PING || cmd == P_PING_ACK))
 663                drbd_tcp_nodelay(sock->socket);
 664
 665        return err;
 666}
 667
 668static int __conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
 669                               enum drbd_packet cmd, unsigned int header_size,
 670                               void *data, unsigned int size)
 671{
 672        return __send_command(connection, 0, sock, cmd, header_size, data, size);
 673}
 674
 675int conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
 676                      enum drbd_packet cmd, unsigned int header_size,
 677                      void *data, unsigned int size)
 678{
 679        int err;
 680
 681        err = __conn_send_command(connection, sock, cmd, header_size, data, size);
 682        mutex_unlock(&sock->mutex);
 683        return err;
 684}
 685
 686int drbd_send_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock,
 687                      enum drbd_packet cmd, unsigned int header_size,
 688                      void *data, unsigned int size)
 689{
 690        int err;
 691
 692        err = __send_command(peer_device->connection, peer_device->device->vnr,
 693                             sock, cmd, header_size, data, size);
 694        mutex_unlock(&sock->mutex);
 695        return err;
 696}
 697
 698int drbd_send_ping(struct drbd_connection *connection)
 699{
 700        struct drbd_socket *sock;
 701
 702        sock = &connection->meta;
 703        if (!conn_prepare_command(connection, sock))
 704                return -EIO;
 705        return conn_send_command(connection, sock, P_PING, 0, NULL, 0);
 706}
 707
 708int drbd_send_ping_ack(struct drbd_connection *connection)
 709{
 710        struct drbd_socket *sock;
 711
 712        sock = &connection->meta;
 713        if (!conn_prepare_command(connection, sock))
 714                return -EIO;
 715        return conn_send_command(connection, sock, P_PING_ACK, 0, NULL, 0);
 716}
 717
 718int drbd_send_sync_param(struct drbd_peer_device *peer_device)
 719{
 720        struct drbd_socket *sock;
 721        struct p_rs_param_95 *p;
 722        int size;
 723        const int apv = peer_device->connection->agreed_pro_version;
 724        enum drbd_packet cmd;
 725        struct net_conf *nc;
 726        struct disk_conf *dc;
 727
 728        sock = &peer_device->connection->data;
 729        p = drbd_prepare_command(peer_device, sock);
 730        if (!p)
 731                return -EIO;
 732
 733        rcu_read_lock();
 734        nc = rcu_dereference(peer_device->connection->net_conf);
 735
 736        size = apv <= 87 ? sizeof(struct p_rs_param)
 737                : apv == 88 ? sizeof(struct p_rs_param)
 738                        + strlen(nc->verify_alg) + 1
 739                : apv <= 94 ? sizeof(struct p_rs_param_89)
 740                : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 741
 742        cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
 743
 744        /* initialize verify_alg and csums_alg */
 745        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 746
 747        if (get_ldev(peer_device->device)) {
 748                dc = rcu_dereference(peer_device->device->ldev->disk_conf);
 749                p->resync_rate = cpu_to_be32(dc->resync_rate);
 750                p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
 751                p->c_delay_target = cpu_to_be32(dc->c_delay_target);
 752                p->c_fill_target = cpu_to_be32(dc->c_fill_target);
 753                p->c_max_rate = cpu_to_be32(dc->c_max_rate);
 754                put_ldev(peer_device->device);
 755        } else {
 756                p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
 757                p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
 758                p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
 759                p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
 760                p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
 761        }
 762
 763        if (apv >= 88)
 764                strcpy(p->verify_alg, nc->verify_alg);
 765        if (apv >= 89)
 766                strcpy(p->csums_alg, nc->csums_alg);
 767        rcu_read_unlock();
 768
 769        return drbd_send_command(peer_device, sock, cmd, size, NULL, 0);
 770}
 771
 772int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cmd)
 773{
 774        struct drbd_socket *sock;
 775        struct p_protocol *p;
 776        struct net_conf *nc;
 777        int size, cf;
 778
 779        sock = &connection->data;
 780        p = __conn_prepare_command(connection, sock);
 781        if (!p)
 782                return -EIO;
 783
 784        rcu_read_lock();
 785        nc = rcu_dereference(connection->net_conf);
 786
 787        if (nc->tentative && connection->agreed_pro_version < 92) {
 788                rcu_read_unlock();
 789                mutex_unlock(&sock->mutex);
 790                drbd_err(connection, "--dry-run is not supported by peer");
 791                return -EOPNOTSUPP;
 792        }
 793
 794        size = sizeof(*p);
 795        if (connection->agreed_pro_version >= 87)
 796                size += strlen(nc->integrity_alg) + 1;
 797
 798        p->protocol      = cpu_to_be32(nc->wire_protocol);
 799        p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
 800        p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
 801        p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
 802        p->two_primaries = cpu_to_be32(nc->two_primaries);
 803        cf = 0;
 804        if (nc->discard_my_data)
 805                cf |= CF_DISCARD_MY_DATA;
 806        if (nc->tentative)
 807                cf |= CF_DRY_RUN;
 808        p->conn_flags    = cpu_to_be32(cf);
 809
 810        if (connection->agreed_pro_version >= 87)
 811                strcpy(p->integrity_alg, nc->integrity_alg);
 812        rcu_read_unlock();
 813
 814        return __conn_send_command(connection, sock, cmd, size, NULL, 0);
 815}
 816
 817int drbd_send_protocol(struct drbd_connection *connection)
 818{
 819        int err;
 820
 821        mutex_lock(&connection->data.mutex);
 822        err = __drbd_send_protocol(connection, P_PROTOCOL);
 823        mutex_unlock(&connection->data.mutex);
 824
 825        return err;
 826}
 827
 828static int _drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags)
 829{
 830        struct drbd_device *device = peer_device->device;
 831        struct drbd_socket *sock;
 832        struct p_uuids *p;
 833        int i;
 834
 835        if (!get_ldev_if_state(device, D_NEGOTIATING))
 836                return 0;
 837
 838        sock = &peer_device->connection->data;
 839        p = drbd_prepare_command(peer_device, sock);
 840        if (!p) {
 841                put_ldev(device);
 842                return -EIO;
 843        }
 844        spin_lock_irq(&device->ldev->md.uuid_lock);
 845        for (i = UI_CURRENT; i < UI_SIZE; i++)
 846                p->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
 847        spin_unlock_irq(&device->ldev->md.uuid_lock);
 848
 849        device->comm_bm_set = drbd_bm_total_weight(device);
 850        p->uuid[UI_SIZE] = cpu_to_be64(device->comm_bm_set);
 851        rcu_read_lock();
 852        uuid_flags |= rcu_dereference(peer_device->connection->net_conf)->discard_my_data ? 1 : 0;
 853        rcu_read_unlock();
 854        uuid_flags |= test_bit(CRASHED_PRIMARY, &device->flags) ? 2 : 0;
 855        uuid_flags |= device->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
 856        p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
 857
 858        put_ldev(device);
 859        return drbd_send_command(peer_device, sock, P_UUIDS, sizeof(*p), NULL, 0);
 860}
 861
 862int drbd_send_uuids(struct drbd_peer_device *peer_device)
 863{
 864        return _drbd_send_uuids(peer_device, 0);
 865}
 866
 867int drbd_send_uuids_skip_initial_sync(struct drbd_peer_device *peer_device)
 868{
 869        return _drbd_send_uuids(peer_device, 8);
 870}
 871
 872void drbd_print_uuids(struct drbd_device *device, const char *text)
 873{
 874        if (get_ldev_if_state(device, D_NEGOTIATING)) {
 875                u64 *uuid = device->ldev->md.uuid;
 876                drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX\n",
 877                     text,
 878                     (unsigned long long)uuid[UI_CURRENT],
 879                     (unsigned long long)uuid[UI_BITMAP],
 880                     (unsigned long long)uuid[UI_HISTORY_START],
 881                     (unsigned long long)uuid[UI_HISTORY_END]);
 882                put_ldev(device);
 883        } else {
 884                drbd_info(device, "%s effective data uuid: %016llX\n",
 885                                text,
 886                                (unsigned long long)device->ed_uuid);
 887        }
 888}
 889
 890void drbd_gen_and_send_sync_uuid(struct drbd_peer_device *peer_device)
 891{
 892        struct drbd_device *device = peer_device->device;
 893        struct drbd_socket *sock;
 894        struct p_rs_uuid *p;
 895        u64 uuid;
 896
 897        D_ASSERT(device, device->state.disk == D_UP_TO_DATE);
 898
 899        uuid = device->ldev->md.uuid[UI_BITMAP];
 900        if (uuid && uuid != UUID_JUST_CREATED)
 901                uuid = uuid + UUID_NEW_BM_OFFSET;
 902        else
 903                get_random_bytes(&uuid, sizeof(u64));
 904        drbd_uuid_set(device, UI_BITMAP, uuid);
 905        drbd_print_uuids(device, "updated sync UUID");
 906        drbd_md_sync(device);
 907
 908        sock = &peer_device->connection->data;
 909        p = drbd_prepare_command(peer_device, sock);
 910        if (p) {
 911                p->uuid = cpu_to_be64(uuid);
 912                drbd_send_command(peer_device, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
 913        }
 914}
 915
 916/* communicated if (agreed_features & DRBD_FF_WSAME) */
 917static void
 918assign_p_sizes_qlim(struct drbd_device *device, struct p_sizes *p,
 919                                        struct request_queue *q)
 920{
 921        if (q) {
 922                p->qlim->physical_block_size = cpu_to_be32(queue_physical_block_size(q));
 923                p->qlim->logical_block_size = cpu_to_be32(queue_logical_block_size(q));
 924                p->qlim->alignment_offset = cpu_to_be32(queue_alignment_offset(q));
 925                p->qlim->io_min = cpu_to_be32(queue_io_min(q));
 926                p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
 927                p->qlim->discard_enabled = blk_queue_discard(q);
 928                p->qlim->write_same_capable = !!q->limits.max_write_same_sectors;
 929        } else {
 930                q = device->rq_queue;
 931                p->qlim->physical_block_size = cpu_to_be32(queue_physical_block_size(q));
 932                p->qlim->logical_block_size = cpu_to_be32(queue_logical_block_size(q));
 933                p->qlim->alignment_offset = 0;
 934                p->qlim->io_min = cpu_to_be32(queue_io_min(q));
 935                p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
 936                p->qlim->discard_enabled = 0;
 937                p->qlim->write_same_capable = 0;
 938        }
 939}
 940
 941int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enum dds_flags flags)
 942{
 943        struct drbd_device *device = peer_device->device;
 944        struct drbd_socket *sock;
 945        struct p_sizes *p;
 946        sector_t d_size, u_size;
 947        int q_order_type;
 948        unsigned int max_bio_size;
 949        unsigned int packet_size;
 950
 951        sock = &peer_device->connection->data;
 952        p = drbd_prepare_command(peer_device, sock);
 953        if (!p)
 954                return -EIO;
 955
 956        packet_size = sizeof(*p);
 957        if (peer_device->connection->agreed_features & DRBD_FF_WSAME)
 958                packet_size += sizeof(p->qlim[0]);
 959
 960        memset(p, 0, packet_size);
 961        if (get_ldev_if_state(device, D_NEGOTIATING)) {
 962                struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
 963                d_size = drbd_get_max_capacity(device->ldev);
 964                rcu_read_lock();
 965                u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
 966                rcu_read_unlock();
 967                q_order_type = drbd_queue_order_type(device);
 968                max_bio_size = queue_max_hw_sectors(q) << 9;
 969                max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
 970                assign_p_sizes_qlim(device, p, q);
 971                put_ldev(device);
 972        } else {
 973                d_size = 0;
 974                u_size = 0;
 975                q_order_type = QUEUE_ORDERED_NONE;
 976                max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
 977                assign_p_sizes_qlim(device, p, NULL);
 978        }
 979
 980        if (peer_device->connection->agreed_pro_version <= 94)
 981                max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
 982        else if (peer_device->connection->agreed_pro_version < 100)
 983                max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE_P95);
 984
 985        p->d_size = cpu_to_be64(d_size);
 986        p->u_size = cpu_to_be64(u_size);
 987        p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(device->this_bdev));
 988        p->max_bio_size = cpu_to_be32(max_bio_size);
 989        p->queue_order_type = cpu_to_be16(q_order_type);
 990        p->dds_flags = cpu_to_be16(flags);
 991
 992        return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0);
 993}
 994
 995/**
 996 * drbd_send_current_state() - Sends the drbd state to the peer
 997 * @peer_device:        DRBD peer device.
 998 */
 999int drbd_send_current_state(struct drbd_peer_device *peer_device)
1000{
1001        struct drbd_socket *sock;
1002        struct p_state *p;
1003
1004        sock = &peer_device->connection->data;
1005        p = drbd_prepare_command(peer_device, sock);
1006        if (!p)
1007                return -EIO;
1008        p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */
1009        return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
1010}
1011
1012/**
1013 * drbd_send_state() - After a state change, sends the new state to the peer
1014 * @peer_device:      DRBD peer device.
1015 * @state:     the state to send, not necessarily the current state.
1016 *
1017 * Each state change queues an "after_state_ch" work, which will eventually
1018 * send the resulting new state to the peer. If more state changes happen
1019 * between queuing and processing of the after_state_ch work, we still
1020 * want to send each intermediary state in the order it occurred.
1021 */
1022int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
1023{
1024        struct drbd_socket *sock;
1025        struct p_state *p;
1026
1027        sock = &peer_device->connection->data;
1028        p = drbd_prepare_command(peer_device, sock);
1029        if (!p)
1030                return -EIO;
1031        p->state = cpu_to_be32(state.i); /* Within the send mutex */
1032        return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
1033}
1034
1035int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
1036{
1037        struct drbd_socket *sock;
1038        struct p_req_state *p;
1039
1040        sock = &peer_device->connection->data;
1041        p = drbd_prepare_command(peer_device, sock);
1042        if (!p)
1043                return -EIO;
1044        p->mask = cpu_to_be32(mask.i);
1045        p->val = cpu_to_be32(val.i);
1046        return drbd_send_command(peer_device, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1047}
1048
1049int conn_send_state_req(struct drbd_connection *connection, union drbd_state mask, union drbd_state val)
1050{
1051        enum drbd_packet cmd;
1052        struct drbd_socket *sock;
1053        struct p_req_state *p;
1054
1055        cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1056        sock = &connection->data;
1057        p = conn_prepare_command(connection, sock);
1058        if (!p)
1059                return -EIO;
1060        p->mask = cpu_to_be32(mask.i);
1061        p->val = cpu_to_be32(val.i);
1062        return conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
1063}
1064
1065void drbd_send_sr_reply(struct drbd_peer_device *peer_device, enum drbd_state_rv retcode)
1066{
1067        struct drbd_socket *sock;
1068        struct p_req_state_reply *p;
1069
1070        sock = &peer_device->connection->meta;
1071        p = drbd_prepare_command(peer_device, sock);
1072        if (p) {
1073                p->retcode = cpu_to_be32(retcode);
1074                drbd_send_command(peer_device, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1075        }
1076}
1077
1078void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode)
1079{
1080        struct drbd_socket *sock;
1081        struct p_req_state_reply *p;
1082        enum drbd_packet cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1083
1084        sock = &connection->meta;
1085        p = conn_prepare_command(connection, sock);
1086        if (p) {
1087                p->retcode = cpu_to_be32(retcode);
1088                conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
1089        }
1090}
1091
1092static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1093{
1094        BUG_ON(code & ~0xf);
1095        p->encoding = (p->encoding & ~0xf) | code;
1096}
1097
1098static void dcbp_set_start(struct p_compressed_bm *p, int set)
1099{
1100        p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1101}
1102
1103static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1104{
1105        BUG_ON(n & ~0x7);
1106        p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1107}
1108
1109static int fill_bitmap_rle_bits(struct drbd_device *device,
1110                         struct p_compressed_bm *p,
1111                         unsigned int size,
1112                         struct bm_xfer_ctx *c)
1113{
1114        struct bitstream bs;
1115        unsigned long plain_bits;
1116        unsigned long tmp;
1117        unsigned long rl;
1118        unsigned len;
1119        unsigned toggle;
1120        int bits, use_rle;
1121
1122        /* may we use this feature? */
1123        rcu_read_lock();
1124        use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
1125        rcu_read_unlock();
1126        if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90)
1127                return 0;
1128
1129        if (c->bit_offset >= c->bm_bits)
1130                return 0; /* nothing to do. */
1131
1132        /* use at most thus many bytes */
1133        bitstream_init(&bs, p->code, size, 0);
1134        memset(p->code, 0, size);
1135        /* plain bits covered in this code string */
1136        plain_bits = 0;
1137
1138        /* p->encoding & 0x80 stores whether the first run length is set.
1139         * bit offset is implicit.
1140         * start with toggle == 2 to be able to tell the first iteration */
1141        toggle = 2;
1142
1143        /* see how much plain bits we can stuff into one packet
1144         * using RLE and VLI. */
1145        do {
1146                tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
1147                                    : _drbd_bm_find_next(device, c->bit_offset);
1148                if (tmp == -1UL)
1149                        tmp = c->bm_bits;
1150                rl = tmp - c->bit_offset;
1151
1152                if (toggle == 2) { /* first iteration */
1153                        if (rl == 0) {
1154                                /* the first checked bit was set,
1155                                 * store start value, */
1156                                dcbp_set_start(p, 1);
1157                                /* but skip encoding of zero run length */
1158                                toggle = !toggle;
1159                                continue;
1160                        }
1161                        dcbp_set_start(p, 0);
1162                }
1163
1164                /* paranoia: catch zero runlength.
1165                 * can only happen if bitmap is modified while we scan it. */
1166                if (rl == 0) {
1167                        drbd_err(device, "unexpected zero runlength while encoding bitmap "
1168                            "t:%u bo:%lu\n", toggle, c->bit_offset);
1169                        return -1;
1170                }
1171
1172                bits = vli_encode_bits(&bs, rl);
1173                if (bits == -ENOBUFS) /* buffer full */
1174                        break;
1175                if (bits <= 0) {
1176                        drbd_err(device, "error while encoding bitmap: %d\n", bits);
1177                        return 0;
1178                }
1179
1180                toggle = !toggle;
1181                plain_bits += rl;
1182                c->bit_offset = tmp;
1183        } while (c->bit_offset < c->bm_bits);
1184
1185        len = bs.cur.b - p->code + !!bs.cur.bit;
1186
1187        if (plain_bits < (len << 3)) {
1188                /* incompressible with this method.
1189                 * we need to rewind both word and bit position. */
1190                c->bit_offset -= plain_bits;
1191                bm_xfer_ctx_bit_to_word_offset(c);
1192                c->bit_offset = c->word_offset * BITS_PER_LONG;
1193                return 0;
1194        }
1195
1196        /* RLE + VLI was able to compress it just fine.
1197         * update c->word_offset. */
1198        bm_xfer_ctx_bit_to_word_offset(c);
1199
1200        /* store pad_bits */
1201        dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1202
1203        return len;
1204}
1205
1206/**
1207 * send_bitmap_rle_or_plain
1208 *
1209 * Return 0 when done, 1 when another iteration is needed, and a negative error
1210 * code upon failure.
1211 */
1212static int
1213send_bitmap_rle_or_plain(struct drbd_device *device, struct bm_xfer_ctx *c)
1214{
1215        struct drbd_socket *sock = &first_peer_device(device)->connection->data;
1216        unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
1217        struct p_compressed_bm *p = sock->sbuf + header_size;
1218        int len, err;
1219
1220        len = fill_bitmap_rle_bits(device, p,
1221                        DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1222        if (len < 0)
1223                return -EIO;
1224
1225        if (len) {
1226                dcbp_set_code(p, RLE_VLI_Bits);
1227                err = __send_command(first_peer_device(device)->connection, device->vnr, sock,
1228                                     P_COMPRESSED_BITMAP, sizeof(*p) + len,
1229                                     NULL, 0);
1230                c->packets[0]++;
1231                c->bytes[0] += header_size + sizeof(*p) + len;
1232
1233                if (c->bit_offset >= c->bm_bits)
1234                        len = 0; /* DONE */
1235        } else {
1236                /* was not compressible.
1237                 * send a buffer full of plain text bits instead. */
1238                unsigned int data_size;
1239                unsigned long num_words;
1240                unsigned long *p = sock->sbuf + header_size;
1241
1242                data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1243                num_words = min_t(size_t, data_size / sizeof(*p),
1244                                  c->bm_words - c->word_offset);
1245                len = num_words * sizeof(*p);
1246                if (len)
1247                        drbd_bm_get_lel(device, c->word_offset, num_words, p);
1248                err = __send_command(first_peer_device(device)->connection, device->vnr, sock, P_BITMAP, len, NULL, 0);
1249                c->word_offset += num_words;
1250                c->bit_offset = c->word_offset * BITS_PER_LONG;
1251
1252                c->packets[1]++;
1253                c->bytes[1] += header_size + len;
1254
1255                if (c->bit_offset > c->bm_bits)
1256                        c->bit_offset = c->bm_bits;
1257        }
1258        if (!err) {
1259                if (len == 0) {
1260                        INFO_bm_xfer_stats(device, "send", c);
1261                        return 0;
1262                } else
1263                        return 1;
1264        }
1265        return -EIO;
1266}
1267
1268/* See the comment at receive_bitmap() */
1269static int _drbd_send_bitmap(struct drbd_device *device)
1270{
1271        struct bm_xfer_ctx c;
1272        int err;
1273
1274        if (!expect(device->bitmap))
1275                return false;
1276
1277        if (get_ldev(device)) {
1278                if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
1279                        drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
1280                        drbd_bm_set_all(device);
1281                        if (drbd_bm_write(device)) {
1282                                /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1283                                 * but otherwise process as per normal - need to tell other
1284                                 * side that a full resync is required! */
1285                                drbd_err(device, "Failed to write bitmap to disk!\n");
1286                        } else {
1287                                drbd_md_clear_flag(device, MDF_FULL_SYNC);
1288                                drbd_md_sync(device);
1289                        }
1290                }
1291                put_ldev(device);
1292        }
1293
1294        c = (struct bm_xfer_ctx) {
1295                .bm_bits = drbd_bm_bits(device),
1296                .bm_words = drbd_bm_words(device),
1297        };
1298
1299        do {
1300                err = send_bitmap_rle_or_plain(device, &c);
1301        } while (err > 0);
1302
1303        return err == 0;
1304}
1305
1306int drbd_send_bitmap(struct drbd_device *device)
1307{
1308        struct drbd_socket *sock = &first_peer_device(device)->connection->data;
1309        int err = -1;
1310
1311        mutex_lock(&sock->mutex);
1312        if (sock->socket)
1313                err = !_drbd_send_bitmap(device);
1314        mutex_unlock(&sock->mutex);
1315        return err;
1316}
1317
1318void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u32 set_size)
1319{
1320        struct drbd_socket *sock;
1321        struct p_barrier_ack *p;
1322
1323        if (connection->cstate < C_WF_REPORT_PARAMS)
1324                return;
1325
1326        sock = &connection->meta;
1327        p = conn_prepare_command(connection, sock);
1328        if (!p)
1329                return;
1330        p->barrier = barrier_nr;
1331        p->set_size = cpu_to_be32(set_size);
1332        conn_send_command(connection, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1333}
1334
1335/**
1336 * _drbd_send_ack() - Sends an ack packet
1337 * @device:     DRBD device.
1338 * @cmd:        Packet command code.
1339 * @sector:     sector, needs to be in big endian byte order
1340 * @blksize:    size in byte, needs to be in big endian byte order
1341 * @block_id:   Id, big endian byte order
1342 */
1343static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1344                          u64 sector, u32 blksize, u64 block_id)
1345{
1346        struct drbd_socket *sock;
1347        struct p_block_ack *p;
1348
1349        if (peer_device->device->state.conn < C_CONNECTED)
1350                return -EIO;
1351
1352        sock = &peer_device->connection->meta;
1353        p = drbd_prepare_command(peer_device, sock);
1354        if (!p)
1355                return -EIO;
1356        p->sector = sector;
1357        p->block_id = block_id;
1358        p->blksize = blksize;
1359        p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->device->packet_seq));
1360        return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
1361}
1362
1363/* dp->sector and dp->block_id already/still in network byte order,
1364 * data_size is payload size according to dp->head,
1365 * and may need to be corrected for digest size. */
1366void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1367                      struct p_data *dp, int data_size)
1368{
1369        if (peer_device->connection->peer_integrity_tfm)
1370                data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1371        _drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
1372                       dp->block_id);
1373}
1374
1375void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1376                      struct p_block_req *rp)
1377{
1378        _drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id);
1379}
1380
1381/**
1382 * drbd_send_ack() - Sends an ack packet
1383 * @device:     DRBD device
1384 * @cmd:        packet command code
1385 * @peer_req:   peer request
1386 */
1387int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1388                  struct drbd_peer_request *peer_req)
1389{
1390        return _drbd_send_ack(peer_device, cmd,
1391                              cpu_to_be64(peer_req->i.sector),
1392                              cpu_to_be32(peer_req->i.size),
1393                              peer_req->block_id);
1394}
1395
1396/* This function misuses the block_id field to signal if the blocks
1397 * are is sync or not. */
1398int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1399                     sector_t sector, int blksize, u64 block_id)
1400{
1401        return _drbd_send_ack(peer_device, cmd,
1402                              cpu_to_be64(sector),
1403                              cpu_to_be32(blksize),
1404                              cpu_to_be64(block_id));
1405}
1406
1407int drbd_send_rs_deallocated(struct drbd_peer_device *peer_device,
1408                             struct drbd_peer_request *peer_req)
1409{
1410        struct drbd_socket *sock;
1411        struct p_block_desc *p;
1412
1413        sock = &peer_device->connection->data;
1414        p = drbd_prepare_command(peer_device, sock);
1415        if (!p)
1416                return -EIO;
1417        p->sector = cpu_to_be64(peer_req->i.sector);
1418        p->blksize = cpu_to_be32(peer_req->i.size);
1419        p->pad = 0;
1420        return drbd_send_command(peer_device, sock, P_RS_DEALLOCATED, sizeof(*p), NULL, 0);
1421}
1422
1423int drbd_send_drequest(struct drbd_peer_device *peer_device, int cmd,
1424                       sector_t sector, int size, u64 block_id)
1425{
1426        struct drbd_socket *sock;
1427        struct p_block_req *p;
1428
1429        sock = &peer_device->connection->data;
1430        p = drbd_prepare_command(peer_device, sock);
1431        if (!p)
1432                return -EIO;
1433        p->sector = cpu_to_be64(sector);
1434        p->block_id = block_id;
1435        p->blksize = cpu_to_be32(size);
1436        return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
1437}
1438
1439int drbd_send_drequest_csum(struct drbd_peer_device *peer_device, sector_t sector, int size,
1440                            void *digest, int digest_size, enum drbd_packet cmd)
1441{
1442        struct drbd_socket *sock;
1443        struct p_block_req *p;
1444
1445        /* FIXME: Put the digest into the preallocated socket buffer.  */
1446
1447        sock = &peer_device->connection->data;
1448        p = drbd_prepare_command(peer_device, sock);
1449        if (!p)
1450                return -EIO;
1451        p->sector = cpu_to_be64(sector);
1452        p->block_id = ID_SYNCER /* unused */;
1453        p->blksize = cpu_to_be32(size);
1454        return drbd_send_command(peer_device, sock, cmd, sizeof(*p), digest, digest_size);
1455}
1456
1457int drbd_send_ov_request(struct drbd_peer_device *peer_device, sector_t sector, int size)
1458{
1459        struct drbd_socket *sock;
1460        struct p_block_req *p;
1461
1462        sock = &peer_device->connection->data;
1463        p = drbd_prepare_command(peer_device, sock);
1464        if (!p)
1465                return -EIO;
1466        p->sector = cpu_to_be64(sector);
1467        p->block_id = ID_SYNCER /* unused */;
1468        p->blksize = cpu_to_be32(size);
1469        return drbd_send_command(peer_device, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1470}
1471
1472/* called on sndtimeo
1473 * returns false if we should retry,
1474 * true if we think connection is dead
1475 */
1476static int we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
1477{
1478        int drop_it;
1479        /* long elapsed = (long)(jiffies - device->last_received); */
1480
1481        drop_it =   connection->meta.socket == sock
1482                || !connection->ack_receiver.task
1483                || get_t_state(&connection->ack_receiver) != RUNNING
1484                || connection->cstate < C_WF_REPORT_PARAMS;
1485
1486        if (drop_it)
1487                return true;
1488
1489        drop_it = !--connection->ko_count;
1490        if (!drop_it) {
1491                drbd_err(connection, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1492                         current->comm, current->pid, connection->ko_count);
1493                request_ping(connection);
1494        }
1495
1496        return drop_it; /* && (device->state == R_PRIMARY) */;
1497}
1498
1499static void drbd_update_congested(struct drbd_connection *connection)
1500{
1501        struct sock *sk = connection->data.socket->sk;
1502        if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1503                set_bit(NET_CONGESTED, &connection->flags);
1504}
1505
1506/* The idea of sendpage seems to be to put some kind of reference
1507 * to the page into the skb, and to hand it over to the NIC. In
1508 * this process get_page() gets called.
1509 *
1510 * As soon as the page was really sent over the network put_page()
1511 * gets called by some part of the network layer. [ NIC driver? ]
1512 *
1513 * [ get_page() / put_page() increment/decrement the count. If count
1514 *   reaches 0 the page will be freed. ]
1515 *
1516 * This works nicely with pages from FSs.
1517 * But this means that in protocol A we might signal IO completion too early!
1518 *
1519 * In order not to corrupt data during a resync we must make sure
1520 * that we do not reuse our own buffer pages (EEs) to early, therefore
1521 * we have the net_ee list.
1522 *
1523 * XFS seems to have problems, still, it submits pages with page_count == 0!
1524 * As a workaround, we disable sendpage on pages
1525 * with page_count == 0 or PageSlab.
1526 */
1527static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page,
1528                              int offset, size_t size, unsigned msg_flags)
1529{
1530        struct socket *socket;
1531        void *addr;
1532        int err;
1533
1534        socket = peer_device->connection->data.socket;
1535        addr = kmap(page) + offset;
1536        err = drbd_send_all(peer_device->connection, socket, addr, size, msg_flags);
1537        kunmap(page);
1538        if (!err)
1539                peer_device->device->send_cnt += size >> 9;
1540        return err;
1541}
1542
1543static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page,
1544                    int offset, size_t size, unsigned msg_flags)
1545{
1546        struct socket *socket = peer_device->connection->data.socket;
1547        int len = size;
1548        int err = -EIO;
1549
1550        /* e.g. XFS meta- & log-data is in slab pages, which have a
1551         * page_count of 0 and/or have PageSlab() set.
1552         * we cannot use send_page for those, as that does get_page();
1553         * put_page(); and would cause either a VM_BUG directly, or
1554         * __page_cache_release a page that would actually still be referenced
1555         * by someone, leading to some obscure delayed Oops somewhere else. */
1556        if (drbd_disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1557                return _drbd_no_send_page(peer_device, page, offset, size, msg_flags);
1558
1559        msg_flags |= MSG_NOSIGNAL;
1560        drbd_update_congested(peer_device->connection);
1561        do {
1562                int sent;
1563
1564                sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1565                if (sent <= 0) {
1566                        if (sent == -EAGAIN) {
1567                                if (we_should_drop_the_connection(peer_device->connection, socket))
1568                                        break;
1569                                continue;
1570                        }
1571                        drbd_warn(peer_device->device, "%s: size=%d len=%d sent=%d\n",
1572                             __func__, (int)size, len, sent);
1573                        if (sent < 0)
1574                                err = sent;
1575                        break;
1576                }
1577                len    -= sent;
1578                offset += sent;
1579        } while (len > 0 /* THINK && device->cstate >= C_CONNECTED*/);
1580        clear_bit(NET_CONGESTED, &peer_device->connection->flags);
1581
1582        if (len == 0) {
1583                err = 0;
1584                peer_device->device->send_cnt += size >> 9;
1585        }
1586        return err;
1587}
1588
1589static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio)
1590{
1591        struct bio_vec bvec;
1592        struct bvec_iter iter;
1593
1594        /* hint all but last page with MSG_MORE */
1595        bio_for_each_segment(bvec, bio, iter) {
1596                int err;
1597
1598                err = _drbd_no_send_page(peer_device, bvec.bv_page,
1599                                         bvec.bv_offset, bvec.bv_len,
1600                                         bio_iter_last(bvec, iter)
1601                                         ? 0 : MSG_MORE);
1602                if (err)
1603                        return err;
1604                /* REQ_OP_WRITE_SAME has only one segment */
1605                if (bio_op(bio) == REQ_OP_WRITE_SAME)
1606                        break;
1607        }
1608        return 0;
1609}
1610
1611static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct bio *bio)
1612{
1613        struct bio_vec bvec;
1614        struct bvec_iter iter;
1615
1616        /* hint all but last page with MSG_MORE */
1617        bio_for_each_segment(bvec, bio, iter) {
1618                int err;
1619
1620                err = _drbd_send_page(peer_device, bvec.bv_page,
1621                                      bvec.bv_offset, bvec.bv_len,
1622                                      bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
1623                if (err)
1624                        return err;
1625                /* REQ_OP_WRITE_SAME has only one segment */
1626                if (bio_op(bio) == REQ_OP_WRITE_SAME)
1627                        break;
1628        }
1629        return 0;
1630}
1631
1632static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
1633                            struct drbd_peer_request *peer_req)
1634{
1635        struct page *page = peer_req->pages;
1636        unsigned len = peer_req->i.size;
1637        int err;
1638
1639        /* hint all but last page with MSG_MORE */
1640        page_chain_for_each(page) {
1641                unsigned l = min_t(unsigned, len, PAGE_SIZE);
1642
1643                err = _drbd_send_page(peer_device, page, 0, l,
1644                                      page_chain_next(page) ? MSG_MORE : 0);
1645                if (err)
1646                        return err;
1647                len -= l;
1648        }
1649        return 0;
1650}
1651
1652static u32 bio_flags_to_wire(struct drbd_connection *connection,
1653                             struct bio *bio)
1654{
1655        if (connection->agreed_pro_version >= 95)
1656                return  (bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0) |
1657                        (bio->bi_opf & REQ_FUA ? DP_FUA : 0) |
1658                        (bio->bi_opf & REQ_PREFLUSH ? DP_FLUSH : 0) |
1659                        (bio_op(bio) == REQ_OP_WRITE_SAME ? DP_WSAME : 0) |
1660                        (bio_op(bio) == REQ_OP_DISCARD ? DP_DISCARD : 0) |
1661                        (bio_op(bio) == REQ_OP_WRITE_ZEROES ?
1662                          ((connection->agreed_features & DRBD_FF_WZEROES) ?
1663                           (DP_ZEROES |(!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0))
1664                           : DP_DISCARD)
1665                        : 0);
1666        else
1667                return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0;
1668}
1669
1670/* Used to send write or TRIM aka REQ_OP_DISCARD requests
1671 * R_PRIMARY -> Peer    (P_DATA, P_TRIM)
1672 */
1673int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *req)
1674{
1675        struct drbd_device *device = peer_device->device;
1676        struct drbd_socket *sock;
1677        struct p_data *p;
1678        struct p_wsame *wsame = NULL;
1679        void *digest_out;
1680        unsigned int dp_flags = 0;
1681        int digest_size;
1682        int err;
1683
1684        sock = &peer_device->connection->data;
1685        p = drbd_prepare_command(peer_device, sock);
1686        digest_size = peer_device->connection->integrity_tfm ?
1687                      crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
1688
1689        if (!p)
1690                return -EIO;
1691        p->sector = cpu_to_be64(req->i.sector);
1692        p->block_id = (unsigned long)req;
1693        p->seq_num = cpu_to_be32(atomic_inc_return(&device->packet_seq));
1694        dp_flags = bio_flags_to_wire(peer_device->connection, req->master_bio);
1695        if (device->state.conn >= C_SYNC_SOURCE &&
1696            device->state.conn <= C_PAUSED_SYNC_T)
1697                dp_flags |= DP_MAY_SET_IN_SYNC;
1698        if (peer_device->connection->agreed_pro_version >= 100) {
1699                if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1700                        dp_flags |= DP_SEND_RECEIVE_ACK;
1701                /* During resync, request an explicit write ack,
1702                 * even in protocol != C */
1703                if (req->rq_state & RQ_EXP_WRITE_ACK
1704                || (dp_flags & DP_MAY_SET_IN_SYNC))
1705                        dp_flags |= DP_SEND_WRITE_ACK;
1706        }
1707        p->dp_flags = cpu_to_be32(dp_flags);
1708
1709        if (dp_flags & (DP_DISCARD|DP_ZEROES)) {
1710                enum drbd_packet cmd = (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM;
1711                struct p_trim *t = (struct p_trim*)p;
1712                t->size = cpu_to_be32(req->i.size);
1713                err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*t), NULL, 0);
1714                goto out;
1715        }
1716        if (dp_flags & DP_WSAME) {
1717                /* this will only work if DRBD_FF_WSAME is set AND the
1718                 * handshake agreed that all nodes and backend devices are
1719                 * WRITE_SAME capable and agree on logical_block_size */
1720                wsame = (struct p_wsame*)p;
1721                digest_out = wsame + 1;
1722                wsame->size = cpu_to_be32(req->i.size);
1723        } else
1724                digest_out = p + 1;
1725
1726        /* our digest is still only over the payload.
1727         * TRIM does not carry any payload. */
1728        if (digest_size)
1729                drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
1730        if (wsame) {
1731                err =
1732                    __send_command(peer_device->connection, device->vnr, sock, P_WSAME,
1733                                   sizeof(*wsame) + digest_size, NULL,
1734                                   bio_iovec(req->master_bio).bv_len);
1735        } else
1736                err =
1737                    __send_command(peer_device->connection, device->vnr, sock, P_DATA,
1738                                   sizeof(*p) + digest_size, NULL, req->i.size);
1739        if (!err) {
1740                /* For protocol A, we have to memcpy the payload into
1741                 * socket buffers, as we may complete right away
1742                 * as soon as we handed it over to tcp, at which point the data
1743                 * pages may become invalid.
1744                 *
1745                 * For data-integrity enabled, we copy it as well, so we can be
1746                 * sure that even if the bio pages may still be modified, it
1747                 * won't change the data on the wire, thus if the digest checks
1748                 * out ok after sending on this side, but does not fit on the
1749                 * receiving side, we sure have detected corruption elsewhere.
1750                 */
1751                if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
1752                        err = _drbd_send_bio(peer_device, req->master_bio);
1753                else
1754                        err = _drbd_send_zc_bio(peer_device, req->master_bio);
1755
1756                /* double check digest, sometimes buffers have been modified in flight. */
1757                if (digest_size > 0 && digest_size <= 64) {
1758                        /* 64 byte, 512 bit, is the largest digest size
1759                         * currently supported in kernel crypto. */
1760                        unsigned char digest[64];
1761                        drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest);
1762                        if (memcmp(p + 1, digest, digest_size)) {
1763                                drbd_warn(device,
1764                                        "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1765                                        (unsigned long long)req->i.sector, req->i.size);
1766                        }
1767                } /* else if (digest_size > 64) {
1768                     ... Be noisy about digest too large ...
1769                } */
1770        }
1771out:
1772        mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1773
1774        return err;
1775}
1776
1777/* answer packet, used to send data back for read requests:
1778 *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1779 *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1780 */
1781int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
1782                    struct drbd_peer_request *peer_req)
1783{
1784        struct drbd_device *device = peer_device->device;
1785        struct drbd_socket *sock;
1786        struct p_data *p;
1787        int err;
1788        int digest_size;
1789
1790        sock = &peer_device->connection->data;
1791        p = drbd_prepare_command(peer_device, sock);
1792
1793        digest_size = peer_device->connection->integrity_tfm ?
1794                      crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
1795
1796        if (!p)
1797                return -EIO;
1798        p->sector = cpu_to_be64(peer_req->i.sector);
1799        p->block_id = peer_req->block_id;
1800        p->seq_num = 0;  /* unused */
1801        p->dp_flags = 0;
1802        if (digest_size)
1803                drbd_csum_ee(peer_device->connection->integrity_tfm, peer_req, p + 1);
1804        err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*p) + digest_size, NULL, peer_req->i.size);
1805        if (!err)
1806                err = _drbd_send_zc_ee(peer_device, peer_req);
1807        mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1808
1809        return err;
1810}
1811
1812int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, struct drbd_request *req)
1813{
1814        struct drbd_socket *sock;
1815        struct p_block_desc *p;
1816
1817        sock = &peer_device->connection->data;
1818        p = drbd_prepare_command(peer_device, sock);
1819        if (!p)
1820                return -EIO;
1821        p->sector = cpu_to_be64(req->i.sector);
1822        p->blksize = cpu_to_be32(req->i.size);
1823        return drbd_send_command(peer_device, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1824}
1825
1826/*
1827  drbd_send distinguishes two cases:
1828
1829  Packets sent via the data socket "sock"
1830  and packets sent via the meta data socket "msock"
1831
1832                    sock                      msock
1833  -----------------+-------------------------+------------------------------
1834  timeout           conf.timeout / 2          conf.timeout / 2
1835  timeout action    send a ping via msock     Abort communication
1836                                              and close all sockets
1837*/
1838
1839/*
1840 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1841 */
1842int drbd_send(struct drbd_connection *connection, struct socket *sock,
1843              void *buf, size_t size, unsigned msg_flags)
1844{
1845        struct kvec iov = {.iov_base = buf, .iov_len = size};
1846        struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL};
1847        int rv, sent = 0;
1848
1849        if (!sock)
1850                return -EBADR;
1851
1852        /* THINK  if (signal_pending) return ... ? */
1853
1854        iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
1855
1856        if (sock == connection->data.socket) {
1857                rcu_read_lock();
1858                connection->ko_count = rcu_dereference(connection->net_conf)->ko_count;
1859                rcu_read_unlock();
1860                drbd_update_congested(connection);
1861        }
1862        do {
1863                rv = sock_sendmsg(sock, &msg);
1864                if (rv == -EAGAIN) {
1865                        if (we_should_drop_the_connection(connection, sock))
1866                                break;
1867                        else
1868                                continue;
1869                }
1870                if (rv == -EINTR) {
1871                        flush_signals(current);
1872                        rv = 0;
1873                }
1874                if (rv < 0)
1875                        break;
1876                sent += rv;
1877        } while (sent < size);
1878
1879        if (sock == connection->data.socket)
1880                clear_bit(NET_CONGESTED, &connection->flags);
1881
1882        if (rv <= 0) {
1883                if (rv != -EAGAIN) {
1884                        drbd_err(connection, "%s_sendmsg returned %d\n",
1885                                 sock == connection->meta.socket ? "msock" : "sock",
1886                                 rv);
1887                        conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
1888                } else
1889                        conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
1890        }
1891
1892        return sent;
1893}
1894
1895/**
1896 * drbd_send_all  -  Send an entire buffer
1897 *
1898 * Returns 0 upon success and a negative error value otherwise.
1899 */
1900int drbd_send_all(struct drbd_connection *connection, struct socket *sock, void *buffer,
1901                  size_t size, unsigned msg_flags)
1902{
1903        int err;
1904
1905        err = drbd_send(connection, sock, buffer, size, msg_flags);
1906        if (err < 0)
1907                return err;
1908        if (err != size)
1909                return -EIO;
1910        return 0;
1911}
1912
1913static int drbd_open(struct block_device *bdev, fmode_t mode)
1914{
1915        struct drbd_device *device = bdev->bd_disk->private_data;
1916        unsigned long flags;
1917        int rv = 0;
1918
1919        mutex_lock(&drbd_main_mutex);
1920        spin_lock_irqsave(&device->resource->req_lock, flags);
1921        /* to have a stable device->state.role
1922         * and no race with updating open_cnt */
1923
1924        if (device->state.role != R_PRIMARY) {
1925                if (mode & FMODE_WRITE)
1926                        rv = -EROFS;
1927                else if (!drbd_allow_oos)
1928                        rv = -EMEDIUMTYPE;
1929        }
1930
1931        if (!rv)
1932                device->open_cnt++;
1933        spin_unlock_irqrestore(&device->resource->req_lock, flags);
1934        mutex_unlock(&drbd_main_mutex);
1935
1936        return rv;
1937}
1938
1939static void drbd_release(struct gendisk *gd, fmode_t mode)
1940{
1941        struct drbd_device *device = gd->private_data;
1942        mutex_lock(&drbd_main_mutex);
1943        device->open_cnt--;
1944        mutex_unlock(&drbd_main_mutex);
1945}
1946
1947/* need to hold resource->req_lock */
1948void drbd_queue_unplug(struct drbd_device *device)
1949{
1950        if (device->state.pdsk >= D_INCONSISTENT && device->state.conn >= C_CONNECTED) {
1951                D_ASSERT(device, device->state.role == R_PRIMARY);
1952                if (test_and_clear_bit(UNPLUG_REMOTE, &device->flags)) {
1953                        drbd_queue_work_if_unqueued(
1954                                &first_peer_device(device)->connection->sender_work,
1955                                &device->unplug_work);
1956                }
1957        }
1958}
1959
1960static void drbd_set_defaults(struct drbd_device *device)
1961{
1962        /* Beware! The actual layout differs
1963         * between big endian and little endian */
1964        device->state = (union drbd_dev_state) {
1965                { .role = R_SECONDARY,
1966                  .peer = R_UNKNOWN,
1967                  .conn = C_STANDALONE,
1968                  .disk = D_DISKLESS,
1969                  .pdsk = D_UNKNOWN,
1970                } };
1971}
1972
1973void drbd_init_set_defaults(struct drbd_device *device)
1974{
1975        /* the memset(,0,) did most of this.
1976         * note: only assignments, no allocation in here */
1977
1978        drbd_set_defaults(device);
1979
1980        atomic_set(&device->ap_bio_cnt, 0);
1981        atomic_set(&device->ap_actlog_cnt, 0);
1982        atomic_set(&device->ap_pending_cnt, 0);
1983        atomic_set(&device->rs_pending_cnt, 0);
1984        atomic_set(&device->unacked_cnt, 0);
1985        atomic_set(&device->local_cnt, 0);
1986        atomic_set(&device->pp_in_use_by_net, 0);
1987        atomic_set(&device->rs_sect_in, 0);
1988        atomic_set(&device->rs_sect_ev, 0);
1989        atomic_set(&device->ap_in_flight, 0);
1990        atomic_set(&device->md_io.in_use, 0);
1991
1992        mutex_init(&device->own_state_mutex);
1993        device->state_mutex = &device->own_state_mutex;
1994
1995        spin_lock_init(&device->al_lock);
1996        spin_lock_init(&device->peer_seq_lock);
1997
1998        INIT_LIST_HEAD(&device->active_ee);
1999        INIT_LIST_HEAD(&device->sync_ee);
2000        INIT_LIST_HEAD(&device->done_ee);
2001        INIT_LIST_HEAD(&device->read_ee);
2002        INIT_LIST_HEAD(&device->net_ee);
2003        INIT_LIST_HEAD(&device->resync_reads);
2004        INIT_LIST_HEAD(&device->resync_work.list);
2005        INIT_LIST_HEAD(&device->unplug_work.list);
2006        INIT_LIST_HEAD(&device->bm_io_work.w.list);
2007        INIT_LIST_HEAD(&device->pending_master_completion[0]);
2008        INIT_LIST_HEAD(&device->pending_master_completion[1]);
2009        INIT_LIST_HEAD(&device->pending_completion[0]);
2010        INIT_LIST_HEAD(&device->pending_completion[1]);
2011
2012        device->resync_work.cb  = w_resync_timer;
2013        device->unplug_work.cb  = w_send_write_hint;
2014        device->bm_io_work.w.cb = w_bitmap_io;
2015
2016        timer_setup(&device->resync_timer, resync_timer_fn, 0);
2017        timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0);
2018        timer_setup(&device->start_resync_timer, start_resync_timer_fn, 0);
2019        timer_setup(&device->request_timer, request_timer_fn, 0);
2020
2021        init_waitqueue_head(&device->misc_wait);
2022        init_waitqueue_head(&device->state_wait);
2023        init_waitqueue_head(&device->ee_wait);
2024        init_waitqueue_head(&device->al_wait);
2025        init_waitqueue_head(&device->seq_wait);
2026
2027        device->resync_wenr = LC_FREE;
2028        device->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2029        device->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2030}
2031
2032static void _drbd_set_my_capacity(struct drbd_device *device, sector_t size)
2033{
2034        /* set_capacity(device->this_bdev->bd_disk, size); */
2035        set_capacity(device->vdisk, size);
2036        device->this_bdev->bd_inode->i_size = (loff_t)size << 9;
2037}
2038
2039void drbd_set_my_capacity(struct drbd_device *device, sector_t size)
2040{
2041        char ppb[10];
2042        _drbd_set_my_capacity(device, size);
2043        drbd_info(device, "size = %s (%llu KB)\n",
2044                ppsize(ppb, size>>1), (unsigned long long)size>>1);
2045}
2046
2047void drbd_device_cleanup(struct drbd_device *device)
2048{
2049        int i;
2050        if (first_peer_device(device)->connection->receiver.t_state != NONE)
2051                drbd_err(device, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2052                                first_peer_device(device)->connection->receiver.t_state);
2053
2054        device->al_writ_cnt  =
2055        device->bm_writ_cnt  =
2056        device->read_cnt     =
2057        device->recv_cnt     =
2058        device->send_cnt     =
2059        device->writ_cnt     =
2060        device->p_size       =
2061        device->rs_start     =
2062        device->rs_total     =
2063        device->rs_failed    = 0;
2064        device->rs_last_events = 0;
2065        device->rs_last_sect_ev = 0;
2066        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2067                device->rs_mark_left[i] = 0;
2068                device->rs_mark_time[i] = 0;
2069        }
2070        D_ASSERT(device, first_peer_device(device)->connection->net_conf == NULL);
2071
2072        _drbd_set_my_capacity(device, 0);
2073        if (device->bitmap) {
2074                /* maybe never allocated. */
2075                drbd_bm_resize(device, 0, 1);
2076                drbd_bm_cleanup(device);
2077        }
2078
2079        drbd_backing_dev_free(device, device->ldev);
2080        device->ldev = NULL;
2081
2082        clear_bit(AL_SUSPENDED, &device->flags);
2083
2084        D_ASSERT(device, list_empty(&device->active_ee));
2085        D_ASSERT(device, list_empty(&device->sync_ee));
2086        D_ASSERT(device, list_empty(&device->done_ee));
2087        D_ASSERT(device, list_empty(&device->read_ee));
2088        D_ASSERT(device, list_empty(&device->net_ee));
2089        D_ASSERT(device, list_empty(&device->resync_reads));
2090        D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
2091        D_ASSERT(device, list_empty(&device->resync_work.list));
2092        D_ASSERT(device, list_empty(&device->unplug_work.list));
2093
2094        drbd_set_defaults(device);
2095}
2096
2097
2098static void drbd_destroy_mempools(void)
2099{
2100        struct page *page;
2101
2102        while (drbd_pp_pool) {
2103                page = drbd_pp_pool;
2104                drbd_pp_pool = (struct page *)page_private(page);
2105                __free_page(page);
2106                drbd_pp_vacant--;
2107        }
2108
2109        /* D_ASSERT(device, atomic_read(&drbd_pp_vacant)==0); */
2110
2111        bioset_exit(&drbd_io_bio_set);
2112        bioset_exit(&drbd_md_io_bio_set);
2113        mempool_exit(&drbd_md_io_page_pool);
2114        mempool_exit(&drbd_ee_mempool);
2115        mempool_exit(&drbd_request_mempool);
2116        kmem_cache_destroy(drbd_ee_cache);
2117        kmem_cache_destroy(drbd_request_cache);
2118        kmem_cache_destroy(drbd_bm_ext_cache);
2119        kmem_cache_destroy(drbd_al_ext_cache);
2120
2121        drbd_ee_cache        = NULL;
2122        drbd_request_cache   = NULL;
2123        drbd_bm_ext_cache    = NULL;
2124        drbd_al_ext_cache    = NULL;
2125
2126        return;
2127}
2128
2129static int drbd_create_mempools(void)
2130{
2131        struct page *page;
2132        const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count;
2133        int i, ret;
2134
2135        /* caches */
2136        drbd_request_cache = kmem_cache_create(
2137                "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2138        if (drbd_request_cache == NULL)
2139                goto Enomem;
2140
2141        drbd_ee_cache = kmem_cache_create(
2142                "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2143        if (drbd_ee_cache == NULL)
2144                goto Enomem;
2145
2146        drbd_bm_ext_cache = kmem_cache_create(
2147                "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2148        if (drbd_bm_ext_cache == NULL)
2149                goto Enomem;
2150
2151        drbd_al_ext_cache = kmem_cache_create(
2152                "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2153        if (drbd_al_ext_cache == NULL)
2154                goto Enomem;
2155
2156        /* mempools */
2157        ret = bioset_init(&drbd_io_bio_set, BIO_POOL_SIZE, 0, 0);
2158        if (ret)
2159                goto Enomem;
2160
2161        ret = bioset_init(&drbd_md_io_bio_set, DRBD_MIN_POOL_PAGES, 0,
2162                          BIOSET_NEED_BVECS);
2163        if (ret)
2164                goto Enomem;
2165
2166        ret = mempool_init_page_pool(&drbd_md_io_page_pool, DRBD_MIN_POOL_PAGES, 0);
2167        if (ret)
2168                goto Enomem;
2169
2170        ret = mempool_init_slab_pool(&drbd_request_mempool, number,
2171                                     drbd_request_cache);
2172        if (ret)
2173                goto Enomem;
2174
2175        ret = mempool_init_slab_pool(&drbd_ee_mempool, number, drbd_ee_cache);
2176        if (ret)
2177                goto Enomem;
2178
2179        /* drbd's page pool */
2180        spin_lock_init(&drbd_pp_lock);
2181
2182        for (i = 0; i < number; i++) {
2183                page = alloc_page(GFP_HIGHUSER);
2184                if (!page)
2185                        goto Enomem;
2186                set_page_private(page, (unsigned long)drbd_pp_pool);
2187                drbd_pp_pool = page;
2188        }
2189        drbd_pp_vacant = number;
2190
2191        return 0;
2192
2193Enomem:
2194        drbd_destroy_mempools(); /* in case we allocated some */
2195        return -ENOMEM;
2196}
2197
2198static void drbd_release_all_peer_reqs(struct drbd_device *device)
2199{
2200        int rr;
2201
2202        rr = drbd_free_peer_reqs(device, &device->active_ee);
2203        if (rr)
2204                drbd_err(device, "%d EEs in active list found!\n", rr);
2205
2206        rr = drbd_free_peer_reqs(device, &device->sync_ee);
2207        if (rr)
2208                drbd_err(device, "%d EEs in sync list found!\n", rr);
2209
2210        rr = drbd_free_peer_reqs(device, &device->read_ee);
2211        if (rr)
2212                drbd_err(device, "%d EEs in read list found!\n", rr);
2213
2214        rr = drbd_free_peer_reqs(device, &device->done_ee);
2215        if (rr)
2216                drbd_err(device, "%d EEs in done list found!\n", rr);
2217
2218        rr = drbd_free_peer_reqs(device, &device->net_ee);
2219        if (rr)
2220                drbd_err(device, "%d EEs in net list found!\n", rr);
2221}
2222
2223/* caution. no locking. */
2224void drbd_destroy_device(struct kref *kref)
2225{
2226        struct drbd_device *device = container_of(kref, struct drbd_device, kref);
2227        struct drbd_resource *resource = device->resource;
2228        struct drbd_peer_device *peer_device, *tmp_peer_device;
2229
2230        del_timer_sync(&device->request_timer);
2231
2232        /* paranoia asserts */
2233        D_ASSERT(device, device->open_cnt == 0);
2234        /* end paranoia asserts */
2235
2236        /* cleanup stuff that may have been allocated during
2237         * device (re-)configuration or state changes */
2238
2239        if (device->this_bdev)
2240                bdput(device->this_bdev);
2241
2242        drbd_backing_dev_free(device, device->ldev);
2243        device->ldev = NULL;
2244
2245        drbd_release_all_peer_reqs(device);
2246
2247        lc_destroy(device->act_log);
2248        lc_destroy(device->resync);
2249
2250        kfree(device->p_uuid);
2251        /* device->p_uuid = NULL; */
2252
2253        if (device->bitmap) /* should no longer be there. */
2254                drbd_bm_cleanup(device);
2255        __free_page(device->md_io.page);
2256        put_disk(device->vdisk);
2257        blk_cleanup_queue(device->rq_queue);
2258        kfree(device->rs_plan_s);
2259
2260        /* not for_each_connection(connection, resource):
2261         * those may have been cleaned up and disassociated already.
2262         */
2263        for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
2264                kref_put(&peer_device->connection->kref, drbd_destroy_connection);
2265                kfree(peer_device);
2266        }
2267        memset(device, 0xfd, sizeof(*device));
2268        kfree(device);
2269        kref_put(&resource->kref, drbd_destroy_resource);
2270}
2271
2272/* One global retry thread, if we need to push back some bio and have it
2273 * reinserted through our make request function.
2274 */
2275static struct retry_worker {
2276        struct workqueue_struct *wq;
2277        struct work_struct worker;
2278
2279        spinlock_t lock;
2280        struct list_head writes;
2281} retry;
2282
2283static void do_retry(struct work_struct *ws)
2284{
2285        struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
2286        LIST_HEAD(writes);
2287        struct drbd_request *req, *tmp;
2288
2289        spin_lock_irq(&retry->lock);
2290        list_splice_init(&retry->writes, &writes);
2291        spin_unlock_irq(&retry->lock);
2292
2293        list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
2294                struct drbd_device *device = req->device;
2295                struct bio *bio = req->master_bio;
2296                unsigned long start_jif = req->start_jif;
2297                bool expected;
2298
2299                expected =
2300                        expect(atomic_read(&req->completion_ref) == 0) &&
2301                        expect(req->rq_state & RQ_POSTPONED) &&
2302                        expect((req->rq_state & RQ_LOCAL_PENDING) == 0 ||
2303                                (req->rq_state & RQ_LOCAL_ABORTED) != 0);
2304
2305                if (!expected)
2306                        drbd_err(device, "req=%p completion_ref=%d rq_state=%x\n",
2307                                req, atomic_read(&req->completion_ref),
2308                                req->rq_state);
2309
2310                /* We still need to put one kref associated with the
2311                 * "completion_ref" going zero in the code path that queued it
2312                 * here.  The request object may still be referenced by a
2313                 * frozen local req->private_bio, in case we force-detached.
2314                 */
2315                kref_put(&req->kref, drbd_req_destroy);
2316
2317                /* A single suspended or otherwise blocking device may stall
2318                 * all others as well.  Fortunately, this code path is to
2319                 * recover from a situation that "should not happen":
2320                 * concurrent writes in multi-primary setup.
2321                 * In a "normal" lifecycle, this workqueue is supposed to be
2322                 * destroyed without ever doing anything.
2323                 * If it turns out to be an issue anyways, we can do per
2324                 * resource (replication group) or per device (minor) retry
2325                 * workqueues instead.
2326                 */
2327
2328                /* We are not just doing generic_make_request(),
2329                 * as we want to keep the start_time information. */
2330                inc_ap_bio(device);
2331                __drbd_make_request(device, bio, start_jif);
2332        }
2333}
2334
2335/* called via drbd_req_put_completion_ref(),
2336 * holds resource->req_lock */
2337void drbd_restart_request(struct drbd_request *req)
2338{
2339        unsigned long flags;
2340        spin_lock_irqsave(&retry.lock, flags);
2341        list_move_tail(&req->tl_requests, &retry.writes);
2342        spin_unlock_irqrestore(&retry.lock, flags);
2343
2344        /* Drop the extra reference that would otherwise
2345         * have been dropped by complete_master_bio.
2346         * do_retry() needs to grab a new one. */
2347        dec_ap_bio(req->device);
2348
2349        queue_work(retry.wq, &retry.worker);
2350}
2351
2352void drbd_destroy_resource(struct kref *kref)
2353{
2354        struct drbd_resource *resource =
2355                container_of(kref, struct drbd_resource, kref);
2356
2357        idr_destroy(&resource->devices);
2358        free_cpumask_var(resource->cpu_mask);
2359        kfree(resource->name);
2360        memset(resource, 0xf2, sizeof(*resource));
2361        kfree(resource);
2362}
2363
2364void drbd_free_resource(struct drbd_resource *resource)
2365{
2366        struct drbd_connection *connection, *tmp;
2367
2368        for_each_connection_safe(connection, tmp, resource) {
2369                list_del(&connection->connections);
2370                drbd_debugfs_connection_cleanup(connection);
2371                kref_put(&connection->kref, drbd_destroy_connection);
2372        }
2373        drbd_debugfs_resource_cleanup(resource);
2374        kref_put(&resource->kref, drbd_destroy_resource);
2375}
2376
2377static void drbd_cleanup(void)
2378{
2379        unsigned int i;
2380        struct drbd_device *device;
2381        struct drbd_resource *resource, *tmp;
2382
2383        /* first remove proc,
2384         * drbdsetup uses it's presence to detect
2385         * whether DRBD is loaded.
2386         * If we would get stuck in proc removal,
2387         * but have netlink already deregistered,
2388         * some drbdsetup commands may wait forever
2389         * for an answer.
2390         */
2391        if (drbd_proc)
2392                remove_proc_entry("drbd", NULL);
2393
2394        if (retry.wq)
2395                destroy_workqueue(retry.wq);
2396
2397        drbd_genl_unregister();
2398
2399        idr_for_each_entry(&drbd_devices, device, i)
2400                drbd_delete_device(device);
2401
2402        /* not _rcu since, no other updater anymore. Genl already unregistered */
2403        for_each_resource_safe(resource, tmp, &drbd_resources) {
2404                list_del(&resource->resources);
2405                drbd_free_resource(resource);
2406        }
2407
2408        drbd_debugfs_cleanup();
2409
2410        drbd_destroy_mempools();
2411        unregister_blkdev(DRBD_MAJOR, "drbd");
2412
2413        idr_destroy(&drbd_devices);
2414
2415        pr_info("module cleanup done.\n");
2416}
2417
2418/**
2419 * drbd_congested() - Callback for the flusher thread
2420 * @congested_data:     User data
2421 * @bdi_bits:           Bits the BDI flusher thread is currently interested in
2422 *
2423 * Returns 1<<WB_async_congested and/or 1<<WB_sync_congested if we are congested.
2424 */
2425static int drbd_congested(void *congested_data, int bdi_bits)
2426{
2427        struct drbd_device *device = congested_data;
2428        struct request_queue *q;
2429        char reason = '-';
2430        int r = 0;
2431
2432        if (!may_inc_ap_bio(device)) {
2433                /* DRBD has frozen IO */
2434                r = bdi_bits;
2435                reason = 'd';
2436                goto out;
2437        }
2438
2439        if (test_bit(CALLBACK_PENDING, &first_peer_device(device)->connection->flags)) {
2440                r |= (1 << WB_async_congested);
2441                /* Without good local data, we would need to read from remote,
2442                 * and that would need the worker thread as well, which is
2443                 * currently blocked waiting for that usermode helper to
2444                 * finish.
2445                 */
2446                if (!get_ldev_if_state(device, D_UP_TO_DATE))
2447                        r |= (1 << WB_sync_congested);
2448                else
2449                        put_ldev(device);
2450                r &= bdi_bits;
2451                reason = 'c';
2452                goto out;
2453        }
2454
2455        if (get_ldev(device)) {
2456                q = bdev_get_queue(device->ldev->backing_bdev);
2457                r = bdi_congested(q->backing_dev_info, bdi_bits);
2458                put_ldev(device);
2459                if (r)
2460                        reason = 'b';
2461        }
2462
2463        if (bdi_bits & (1 << WB_async_congested) &&
2464            test_bit(NET_CONGESTED, &first_peer_device(device)->connection->flags)) {
2465                r |= (1 << WB_async_congested);
2466                reason = reason == 'b' ? 'a' : 'n';
2467        }
2468
2469out:
2470        device->congestion_reason = reason;
2471        return r;
2472}
2473
2474static void drbd_init_workqueue(struct drbd_work_queue* wq)
2475{
2476        spin_lock_init(&wq->q_lock);
2477        INIT_LIST_HEAD(&wq->q);
2478        init_waitqueue_head(&wq->q_wait);
2479}
2480
2481struct completion_work {
2482        struct drbd_work w;
2483        struct completion done;
2484};
2485
2486static int w_complete(struct drbd_work *w, int cancel)
2487{
2488        struct completion_work *completion_work =
2489                container_of(w, struct completion_work, w);
2490
2491        complete(&completion_work->done);
2492        return 0;
2493}
2494
2495void drbd_flush_workqueue(struct drbd_work_queue *work_queue)
2496{
2497        struct completion_work completion_work;
2498
2499        completion_work.w.cb = w_complete;
2500        init_completion(&completion_work.done);
2501        drbd_queue_work(work_queue, &completion_work.w);
2502        wait_for_completion(&completion_work.done);
2503}
2504
2505struct drbd_resource *drbd_find_resource(const char *name)
2506{
2507        struct drbd_resource *resource;
2508
2509        if (!name || !name[0])
2510                return NULL;
2511
2512        rcu_read_lock();
2513        for_each_resource_rcu(resource, &drbd_resources) {
2514                if (!strcmp(resource->name, name)) {
2515                        kref_get(&resource->kref);
2516                        goto found;
2517                }
2518        }
2519        resource = NULL;
2520found:
2521        rcu_read_unlock();
2522        return resource;
2523}
2524
2525struct drbd_connection *conn_get_by_addrs(void *my_addr, int my_addr_len,
2526                                     void *peer_addr, int peer_addr_len)
2527{
2528        struct drbd_resource *resource;
2529        struct drbd_connection *connection;
2530
2531        rcu_read_lock();
2532        for_each_resource_rcu(resource, &drbd_resources) {
2533                for_each_connection_rcu(connection, resource) {
2534                        if (connection->my_addr_len == my_addr_len &&
2535                            connection->peer_addr_len == peer_addr_len &&
2536                            !memcmp(&connection->my_addr, my_addr, my_addr_len) &&
2537                            !memcmp(&connection->peer_addr, peer_addr, peer_addr_len)) {
2538                                kref_get(&connection->kref);
2539                                goto found;
2540                        }
2541                }
2542        }
2543        connection = NULL;
2544found:
2545        rcu_read_unlock();
2546        return connection;
2547}
2548
2549static int drbd_alloc_socket(struct drbd_socket *socket)
2550{
2551        socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2552        if (!socket->rbuf)
2553                return -ENOMEM;
2554        socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2555        if (!socket->sbuf)
2556                return -ENOMEM;
2557        return 0;
2558}
2559
2560static void drbd_free_socket(struct drbd_socket *socket)
2561{
2562        free_page((unsigned long) socket->sbuf);
2563        free_page((unsigned long) socket->rbuf);
2564}
2565
2566void conn_free_crypto(struct drbd_connection *connection)
2567{
2568        drbd_free_sock(connection);
2569
2570        crypto_free_shash(connection->csums_tfm);
2571        crypto_free_shash(connection->verify_tfm);
2572        crypto_free_shash(connection->cram_hmac_tfm);
2573        crypto_free_shash(connection->integrity_tfm);
2574        crypto_free_shash(connection->peer_integrity_tfm);
2575        kfree(connection->int_dig_in);
2576        kfree(connection->int_dig_vv);
2577
2578        connection->csums_tfm = NULL;
2579        connection->verify_tfm = NULL;
2580        connection->cram_hmac_tfm = NULL;
2581        connection->integrity_tfm = NULL;
2582        connection->peer_integrity_tfm = NULL;
2583        connection->int_dig_in = NULL;
2584        connection->int_dig_vv = NULL;
2585}
2586
2587int set_resource_options(struct drbd_resource *resource, struct res_opts *res_opts)
2588{
2589        struct drbd_connection *connection;
2590        cpumask_var_t new_cpu_mask;
2591        int err;
2592
2593        if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2594                return -ENOMEM;
2595
2596        /* silently ignore cpu mask on UP kernel */
2597        if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2598                err = bitmap_parse(res_opts->cpu_mask, DRBD_CPU_MASK_SIZE,
2599                                   cpumask_bits(new_cpu_mask), nr_cpu_ids);
2600                if (err == -EOVERFLOW) {
2601                        /* So what. mask it out. */
2602                        cpumask_var_t tmp_cpu_mask;
2603                        if (zalloc_cpumask_var(&tmp_cpu_mask, GFP_KERNEL)) {
2604                                cpumask_setall(tmp_cpu_mask);
2605                                cpumask_and(new_cpu_mask, new_cpu_mask, tmp_cpu_mask);
2606                                drbd_warn(resource, "Overflow in bitmap_parse(%.12s%s), truncating to %u bits\n",
2607                                        res_opts->cpu_mask,
2608                                        strlen(res_opts->cpu_mask) > 12 ? "..." : "",
2609                                        nr_cpu_ids);
2610                                free_cpumask_var(tmp_cpu_mask);
2611                                err = 0;
2612                        }
2613                }
2614                if (err) {
2615                        drbd_warn(resource, "bitmap_parse() failed with %d\n", err);
2616                        /* retcode = ERR_CPU_MASK_PARSE; */
2617                        goto fail;
2618                }
2619        }
2620        resource->res_opts = *res_opts;
2621        if (cpumask_empty(new_cpu_mask))
2622                drbd_calc_cpu_mask(&new_cpu_mask);
2623        if (!cpumask_equal(resource->cpu_mask, new_cpu_mask)) {
2624                cpumask_copy(resource->cpu_mask, new_cpu_mask);
2625                for_each_connection_rcu(connection, resource) {
2626                        connection->receiver.reset_cpu_mask = 1;
2627                        connection->ack_receiver.reset_cpu_mask = 1;
2628                        connection->worker.reset_cpu_mask = 1;
2629                }
2630        }
2631        err = 0;
2632
2633fail:
2634        free_cpumask_var(new_cpu_mask);
2635        return err;
2636
2637}
2638
2639struct drbd_resource *drbd_create_resource(const char *name)
2640{
2641        struct drbd_resource *resource;
2642
2643        resource = kzalloc(sizeof(struct drbd_resource), GFP_KERNEL);
2644        if (!resource)
2645                goto fail;
2646        resource->name = kstrdup(name, GFP_KERNEL);
2647        if (!resource->name)
2648                goto fail_free_resource;
2649        if (!zalloc_cpumask_var(&resource->cpu_mask, GFP_KERNEL))
2650                goto fail_free_name;
2651        kref_init(&resource->kref);
2652        idr_init(&resource->devices);
2653        INIT_LIST_HEAD(&resource->connections);
2654        resource->write_ordering = WO_BDEV_FLUSH;
2655        list_add_tail_rcu(&resource->resources, &drbd_resources);
2656        mutex_init(&resource->conf_update);
2657        mutex_init(&resource->adm_mutex);
2658        spin_lock_init(&resource->req_lock);
2659        drbd_debugfs_resource_add(resource);
2660        return resource;
2661
2662fail_free_name:
2663        kfree(resource->name);
2664fail_free_resource:
2665        kfree(resource);
2666fail:
2667        return NULL;
2668}
2669
2670/* caller must be under adm_mutex */
2671struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
2672{
2673        struct drbd_resource *resource;
2674        struct drbd_connection *connection;
2675
2676        connection = kzalloc(sizeof(struct drbd_connection), GFP_KERNEL);
2677        if (!connection)
2678                return NULL;
2679
2680        if (drbd_alloc_socket(&connection->data))
2681                goto fail;
2682        if (drbd_alloc_socket(&connection->meta))
2683                goto fail;
2684
2685        connection->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2686        if (!connection->current_epoch)
2687                goto fail;
2688
2689        INIT_LIST_HEAD(&connection->transfer_log);
2690
2691        INIT_LIST_HEAD(&connection->current_epoch->list);
2692        connection->epochs = 1;
2693        spin_lock_init(&connection->epoch_lock);
2694
2695        connection->send.seen_any_write_yet = false;
2696        connection->send.current_epoch_nr = 0;
2697        connection->send.current_epoch_writes = 0;
2698
2699        resource = drbd_create_resource(name);
2700        if (!resource)
2701                goto fail;
2702
2703        connection->cstate = C_STANDALONE;
2704        mutex_init(&connection->cstate_mutex);
2705        init_waitqueue_head(&connection->ping_wait);
2706        idr_init(&connection->peer_devices);
2707
2708        drbd_init_workqueue(&connection->sender_work);
2709        mutex_init(&connection->data.mutex);
2710        mutex_init(&connection->meta.mutex);
2711
2712        drbd_thread_init(resource, &connection->receiver, drbd_receiver, "receiver");
2713        connection->receiver.connection = connection;
2714        drbd_thread_init(resource, &connection->worker, drbd_worker, "worker");
2715        connection->worker.connection = connection;
2716        drbd_thread_init(resource, &connection->ack_receiver, drbd_ack_receiver, "ack_recv");
2717        connection->ack_receiver.connection = connection;
2718
2719        kref_init(&connection->kref);
2720
2721        connection->resource = resource;
2722
2723        if (set_resource_options(resource, res_opts))
2724                goto fail_resource;
2725
2726        kref_get(&resource->kref);
2727        list_add_tail_rcu(&connection->connections, &resource->connections);
2728        drbd_debugfs_connection_add(connection);
2729        return connection;
2730
2731fail_resource:
2732        list_del(&resource->resources);
2733        drbd_free_resource(resource);
2734fail:
2735        kfree(connection->current_epoch);
2736        drbd_free_socket(&connection->meta);
2737        drbd_free_socket(&connection->data);
2738        kfree(connection);
2739        return NULL;
2740}
2741
2742void drbd_destroy_connection(struct kref *kref)
2743{
2744        struct drbd_connection *connection = container_of(kref, struct drbd_connection, kref);
2745        struct drbd_resource *resource = connection->resource;
2746
2747        if (atomic_read(&connection->current_epoch->epoch_size) !=  0)
2748                drbd_err(connection, "epoch_size:%d\n", atomic_read(&connection->current_epoch->epoch_size));
2749        kfree(connection->current_epoch);
2750
2751        idr_destroy(&connection->peer_devices);
2752
2753        drbd_free_socket(&connection->meta);
2754        drbd_free_socket(&connection->data);
2755        kfree(connection->int_dig_in);
2756        kfree(connection->int_dig_vv);
2757        memset(connection, 0xfc, sizeof(*connection));
2758        kfree(connection);
2759        kref_put(&resource->kref, drbd_destroy_resource);
2760}
2761
2762static int init_submitter(struct drbd_device *device)
2763{
2764        /* opencoded create_singlethread_workqueue(),
2765         * to be able to say "drbd%d", ..., minor */
2766        device->submit.wq =
2767                alloc_ordered_workqueue("drbd%u_submit", WQ_MEM_RECLAIM, device->minor);
2768        if (!device->submit.wq)
2769                return -ENOMEM;
2770
2771        INIT_WORK(&device->submit.worker, do_submit);
2772        INIT_LIST_HEAD(&device->submit.writes);
2773        return 0;
2774}
2775
2776enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsigned int minor)
2777{
2778        struct drbd_resource *resource = adm_ctx->resource;
2779        struct drbd_connection *connection;
2780        struct drbd_device *device;
2781        struct drbd_peer_device *peer_device, *tmp_peer_device;
2782        struct gendisk *disk;
2783        struct request_queue *q;
2784        int id;
2785        int vnr = adm_ctx->volume;
2786        enum drbd_ret_code err = ERR_NOMEM;
2787
2788        device = minor_to_device(minor);
2789        if (device)
2790                return ERR_MINOR_OR_VOLUME_EXISTS;
2791
2792        /* GFP_KERNEL, we are outside of all write-out paths */
2793        device = kzalloc(sizeof(struct drbd_device), GFP_KERNEL);
2794        if (!device)
2795                return ERR_NOMEM;
2796        kref_init(&device->kref);
2797
2798        kref_get(&resource->kref);
2799        device->resource = resource;
2800        device->minor = minor;
2801        device->vnr = vnr;
2802
2803        drbd_init_set_defaults(device);
2804
2805        q = blk_alloc_queue_node(GFP_KERNEL, NUMA_NO_NODE);
2806        if (!q)
2807                goto out_no_q;
2808        device->rq_queue = q;
2809        q->queuedata   = device;
2810
2811        disk = alloc_disk(1);
2812        if (!disk)
2813                goto out_no_disk;
2814        device->vdisk = disk;
2815
2816        set_disk_ro(disk, true);
2817
2818        disk->queue = q;
2819        disk->major = DRBD_MAJOR;
2820        disk->first_minor = minor;
2821        disk->fops = &drbd_ops;
2822        sprintf(disk->disk_name, "drbd%d", minor);
2823        disk->private_data = device;
2824
2825        device->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2826        /* we have no partitions. we contain only ourselves. */
2827        device->this_bdev->bd_contains = device->this_bdev;
2828
2829        q->backing_dev_info->congested_fn = drbd_congested;
2830        q->backing_dev_info->congested_data = device;
2831
2832        blk_queue_make_request(q, drbd_make_request);
2833        blk_queue_write_cache(q, true, true);
2834        /* Setting the max_hw_sectors to an odd value of 8kibyte here
2835           This triggers a max_bio_size message upon first attach or connect */
2836        blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2837
2838        device->md_io.page = alloc_page(GFP_KERNEL);
2839        if (!device->md_io.page)
2840                goto out_no_io_page;
2841
2842        if (drbd_bm_init(device))
2843                goto out_no_bitmap;
2844        device->read_requests = RB_ROOT;
2845        device->write_requests = RB_ROOT;
2846
2847        id = idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_KERNEL);
2848        if (id < 0) {
2849                if (id == -ENOSPC)
2850                        err = ERR_MINOR_OR_VOLUME_EXISTS;
2851                goto out_no_minor_idr;
2852        }
2853        kref_get(&device->kref);
2854
2855        id = idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_KERNEL);
2856        if (id < 0) {
2857                if (id == -ENOSPC)
2858                        err = ERR_MINOR_OR_VOLUME_EXISTS;
2859                goto out_idr_remove_minor;
2860        }
2861        kref_get(&device->kref);
2862
2863        INIT_LIST_HEAD(&device->peer_devices);
2864        INIT_LIST_HEAD(&device->pending_bitmap_io);
2865        for_each_connection(connection, resource) {
2866                peer_device = kzalloc(sizeof(struct drbd_peer_device), GFP_KERNEL);
2867                if (!peer_device)
2868                        goto out_idr_remove_from_resource;
2869                peer_device->connection = connection;
2870                peer_device->device = device;
2871
2872                list_add(&peer_device->peer_devices, &device->peer_devices);
2873                kref_get(&device->kref);
2874
2875                id = idr_alloc(&connection->peer_devices, peer_device, vnr, vnr + 1, GFP_KERNEL);
2876                if (id < 0) {
2877                        if (id == -ENOSPC)
2878                                err = ERR_INVALID_REQUEST;
2879                        goto out_idr_remove_from_resource;
2880                }
2881                kref_get(&connection->kref);
2882                INIT_WORK(&peer_device->send_acks_work, drbd_send_acks_wf);
2883        }
2884
2885        if (init_submitter(device)) {
2886                err = ERR_NOMEM;
2887                goto out_idr_remove_vol;
2888        }
2889
2890        add_disk(disk);
2891
2892        /* inherit the connection state */
2893        device->state.conn = first_connection(resource)->cstate;
2894        if (device->state.conn == C_WF_REPORT_PARAMS) {
2895                for_each_peer_device(peer_device, device)
2896                        drbd_connected(peer_device);
2897        }
2898        /* move to create_peer_device() */
2899        for_each_peer_device(peer_device, device)
2900                drbd_debugfs_peer_device_add(peer_device);
2901        drbd_debugfs_device_add(device);
2902        return NO_ERROR;
2903
2904out_idr_remove_vol:
2905        idr_remove(&connection->peer_devices, vnr);
2906out_idr_remove_from_resource:
2907        for_each_connection(connection, resource) {
2908                peer_device = idr_remove(&connection->peer_devices, vnr);
2909                if (peer_device)
2910                        kref_put(&connection->kref, drbd_destroy_connection);
2911        }
2912        for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
2913                list_del(&peer_device->peer_devices);
2914                kfree(peer_device);
2915        }
2916        idr_remove(&resource->devices, vnr);
2917out_idr_remove_minor:
2918        idr_remove(&drbd_devices, minor);
2919        synchronize_rcu();
2920out_no_minor_idr:
2921        drbd_bm_cleanup(device);
2922out_no_bitmap:
2923        __free_page(device->md_io.page);
2924out_no_io_page:
2925        put_disk(disk);
2926out_no_disk:
2927        blk_cleanup_queue(q);
2928out_no_q:
2929        kref_put(&resource->kref, drbd_destroy_resource);
2930        kfree(device);
2931        return err;
2932}
2933
2934void drbd_delete_device(struct drbd_device *device)
2935{
2936        struct drbd_resource *resource = device->resource;
2937        struct drbd_connection *connection;
2938        struct drbd_peer_device *peer_device;
2939
2940        /* move to free_peer_device() */
2941        for_each_peer_device(peer_device, device)
2942                drbd_debugfs_peer_device_cleanup(peer_device);
2943        drbd_debugfs_device_cleanup(device);
2944        for_each_connection(connection, resource) {
2945                idr_remove(&connection->peer_devices, device->vnr);
2946                kref_put(&device->kref, drbd_destroy_device);
2947        }
2948        idr_remove(&resource->devices, device->vnr);
2949        kref_put(&device->kref, drbd_destroy_device);
2950        idr_remove(&drbd_devices, device_to_minor(device));
2951        kref_put(&device->kref, drbd_destroy_device);
2952        del_gendisk(device->vdisk);
2953        synchronize_rcu();
2954        kref_put(&device->kref, drbd_destroy_device);
2955}
2956
2957static int __init drbd_init(void)
2958{
2959        int err;
2960
2961        if (drbd_minor_count < DRBD_MINOR_COUNT_MIN || drbd_minor_count > DRBD_MINOR_COUNT_MAX) {
2962                pr_err("invalid minor_count (%d)\n", drbd_minor_count);
2963#ifdef MODULE
2964                return -EINVAL;
2965#else
2966                drbd_minor_count = DRBD_MINOR_COUNT_DEF;
2967#endif
2968        }
2969
2970        err = register_blkdev(DRBD_MAJOR, "drbd");
2971        if (err) {
2972                pr_err("unable to register block device major %d\n",
2973                       DRBD_MAJOR);
2974                return err;
2975        }
2976
2977        /*
2978         * allocate all necessary structs
2979         */
2980        init_waitqueue_head(&drbd_pp_wait);
2981
2982        drbd_proc = NULL; /* play safe for drbd_cleanup */
2983        idr_init(&drbd_devices);
2984
2985        mutex_init(&resources_mutex);
2986        INIT_LIST_HEAD(&drbd_resources);
2987
2988        err = drbd_genl_register();
2989        if (err) {
2990                pr_err("unable to register generic netlink family\n");
2991                goto fail;
2992        }
2993
2994        err = drbd_create_mempools();
2995        if (err)
2996                goto fail;
2997
2998        err = -ENOMEM;
2999        drbd_proc = proc_create_single("drbd", S_IFREG | 0444 , NULL, drbd_seq_show);
3000        if (!drbd_proc) {
3001                pr_err("unable to register proc file\n");
3002                goto fail;
3003        }
3004
3005        retry.wq = create_singlethread_workqueue("drbd-reissue");
3006        if (!retry.wq) {
3007                pr_err("unable to create retry workqueue\n");
3008                goto fail;
3009        }
3010        INIT_WORK(&retry.worker, do_retry);
3011        spin_lock_init(&retry.lock);
3012        INIT_LIST_HEAD(&retry.writes);
3013
3014        drbd_debugfs_init();
3015
3016        pr_info("initialized. "
3017               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3018               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3019        pr_info("%s\n", drbd_buildtag());
3020        pr_info("registered as block device major %d\n", DRBD_MAJOR);
3021        return 0; /* Success! */
3022
3023fail:
3024        drbd_cleanup();
3025        if (err == -ENOMEM)
3026                pr_err("ran out of memory\n");
3027        else
3028                pr_err("initialization failure\n");
3029        return err;
3030}
3031
3032static void drbd_free_one_sock(struct drbd_socket *ds)
3033{
3034        struct socket *s;
3035        mutex_lock(&ds->mutex);
3036        s = ds->socket;
3037        ds->socket = NULL;
3038        mutex_unlock(&ds->mutex);
3039        if (s) {
3040                /* so debugfs does not need to mutex_lock() */
3041                synchronize_rcu();
3042                kernel_sock_shutdown(s, SHUT_RDWR);
3043                sock_release(s);
3044        }
3045}
3046
3047void drbd_free_sock(struct drbd_connection *connection)
3048{
3049        if (connection->data.socket)
3050                drbd_free_one_sock(&connection->data);
3051        if (connection->meta.socket)
3052                drbd_free_one_sock(&connection->meta);
3053}
3054
3055/* meta data management */
3056
3057void conn_md_sync(struct drbd_connection *connection)
3058{
3059        struct drbd_peer_device *peer_device;
3060        int vnr;
3061
3062        rcu_read_lock();
3063        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
3064                struct drbd_device *device = peer_device->device;
3065
3066                kref_get(&device->kref);
3067                rcu_read_unlock();
3068                drbd_md_sync(device);
3069                kref_put(&device->kref, drbd_destroy_device);
3070                rcu_read_lock();
3071        }
3072        rcu_read_unlock();
3073}
3074
3075/* aligned 4kByte */
3076struct meta_data_on_disk {
3077        u64 la_size_sect;      /* last agreed size. */
3078        u64 uuid[UI_SIZE];   /* UUIDs. */
3079        u64 device_uuid;
3080        u64 reserved_u64_1;
3081        u32 flags;             /* MDF */
3082        u32 magic;
3083        u32 md_size_sect;
3084        u32 al_offset;         /* offset to this block */
3085        u32 al_nr_extents;     /* important for restoring the AL (userspace) */
3086              /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
3087        u32 bm_offset;         /* offset to the bitmap, from here */
3088        u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3089        u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3090
3091        /* see al_tr_number_to_on_disk_sector() */
3092        u32 al_stripes;
3093        u32 al_stripe_size_4k;
3094
3095        u8 reserved_u8[4096 - (7*8 + 10*4)];
3096} __packed;
3097
3098
3099
3100void drbd_md_write(struct drbd_device *device, void *b)
3101{
3102        struct meta_data_on_disk *buffer = b;
3103        sector_t sector;
3104        int i;
3105
3106        memset(buffer, 0, sizeof(*buffer));
3107
3108        buffer->la_size_sect = cpu_to_be64(drbd_get_capacity(device->this_bdev));
3109        for (i = UI_CURRENT; i < UI_SIZE; i++)
3110                buffer->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
3111        buffer->flags = cpu_to_be32(device->ldev->md.flags);
3112        buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
3113
3114        buffer->md_size_sect  = cpu_to_be32(device->ldev->md.md_size_sect);
3115        buffer->al_offset     = cpu_to_be32(device->ldev->md.al_offset);
3116        buffer->al_nr_extents = cpu_to_be32(device->act_log->nr_elements);
3117        buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3118        buffer->device_uuid = cpu_to_be64(device->ldev->md.device_uuid);
3119
3120        buffer->bm_offset = cpu_to_be32(device->ldev->md.bm_offset);
3121        buffer->la_peer_max_bio_size = cpu_to_be32(device->peer_max_bio_size);
3122
3123        buffer->al_stripes = cpu_to_be32(device->ldev->md.al_stripes);
3124        buffer->al_stripe_size_4k = cpu_to_be32(device->ldev->md.al_stripe_size_4k);
3125
3126        D_ASSERT(device, drbd_md_ss(device->ldev) == device->ldev->md.md_offset);
3127        sector = device->ldev->md.md_offset;
3128
3129        if (drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE)) {
3130                /* this was a try anyways ... */
3131                drbd_err(device, "meta data update failed!\n");
3132                drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
3133        }
3134}
3135
3136/**
3137 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3138 * @device:     DRBD device.
3139 */
3140void drbd_md_sync(struct drbd_device *device)
3141{
3142        struct meta_data_on_disk *buffer;
3143
3144        /* Don't accidentally change the DRBD meta data layout. */
3145        BUILD_BUG_ON(UI_SIZE != 4);
3146        BUILD_BUG_ON(sizeof(struct meta_data_on_disk) != 4096);
3147
3148        del_timer(&device->md_sync_timer);
3149        /* timer may be rearmed by drbd_md_mark_dirty() now. */
3150        if (!test_and_clear_bit(MD_DIRTY, &device->flags))
3151                return;
3152
3153        /* We use here D_FAILED and not D_ATTACHING because we try to write
3154         * metadata even if we detach due to a disk failure! */
3155        if (!get_ldev_if_state(device, D_FAILED))
3156                return;
3157
3158        buffer = drbd_md_get_buffer(device, __func__);
3159        if (!buffer)
3160                goto out;
3161
3162        drbd_md_write(device, buffer);
3163
3164        /* Update device->ldev->md.la_size_sect,
3165         * since we updated it on metadata. */
3166        device->ldev->md.la_size_sect = drbd_get_capacity(device->this_bdev);
3167
3168        drbd_md_put_buffer(device);
3169out:
3170        put_ldev(device);
3171}
3172
3173static int check_activity_log_stripe_size(struct drbd_device *device,
3174                struct meta_data_on_disk *on_disk,
3175                struct drbd_md *in_core)
3176{
3177        u32 al_stripes = be32_to_cpu(on_disk->al_stripes);
3178        u32 al_stripe_size_4k = be32_to_cpu(on_disk->al_stripe_size_4k);
3179        u64 al_size_4k;
3180
3181        /* both not set: default to old fixed size activity log */
3182        if (al_stripes == 0 && al_stripe_size_4k == 0) {
3183                al_stripes = 1;
3184                al_stripe_size_4k = MD_32kB_SECT/8;
3185        }
3186
3187        /* some paranoia plausibility checks */
3188
3189        /* we need both values to be set */
3190        if (al_stripes == 0 || al_stripe_size_4k == 0)
3191                goto err;
3192
3193        al_size_4k = (u64)al_stripes * al_stripe_size_4k;
3194
3195        /* Upper limit of activity log area, to avoid potential overflow
3196         * problems in al_tr_number_to_on_disk_sector(). As right now, more
3197         * than 72 * 4k blocks total only increases the amount of history,
3198         * limiting this arbitrarily to 16 GB is not a real limitation ;-)  */
3199        if (al_size_4k > (16 * 1024 * 1024/4))
3200                goto err;
3201
3202        /* Lower limit: we need at least 8 transaction slots (32kB)
3203         * to not break existing setups */
3204        if (al_size_4k < MD_32kB_SECT/8)
3205                goto err;
3206
3207        in_core->al_stripe_size_4k = al_stripe_size_4k;
3208        in_core->al_stripes = al_stripes;
3209        in_core->al_size_4k = al_size_4k;
3210
3211        return 0;
3212err:
3213        drbd_err(device, "invalid activity log striping: al_stripes=%u, al_stripe_size_4k=%u\n",
3214                        al_stripes, al_stripe_size_4k);
3215        return -EINVAL;
3216}
3217
3218static int check_offsets_and_sizes(struct drbd_device *device, struct drbd_backing_dev *bdev)
3219{
3220        sector_t capacity = drbd_get_capacity(bdev->md_bdev);
3221        struct drbd_md *in_core = &bdev->md;
3222        s32 on_disk_al_sect;
3223        s32 on_disk_bm_sect;
3224
3225        /* The on-disk size of the activity log, calculated from offsets, and
3226         * the size of the activity log calculated from the stripe settings,
3227         * should match.
3228         * Though we could relax this a bit: it is ok, if the striped activity log
3229         * fits in the available on-disk activity log size.
3230         * Right now, that would break how resize is implemented.
3231         * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware
3232         * of possible unused padding space in the on disk layout. */
3233        if (in_core->al_offset < 0) {
3234                if (in_core->bm_offset > in_core->al_offset)
3235                        goto err;
3236                on_disk_al_sect = -in_core->al_offset;
3237                on_disk_bm_sect = in_core->al_offset - in_core->bm_offset;
3238        } else {
3239                if (in_core->al_offset != MD_4kB_SECT)
3240                        goto err;
3241                if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4kB_SECT)
3242                        goto err;
3243
3244                on_disk_al_sect = in_core->bm_offset - MD_4kB_SECT;
3245                on_disk_bm_sect = in_core->md_size_sect - in_core->bm_offset;
3246        }
3247
3248        /* old fixed size meta data is exactly that: fixed. */
3249        if (in_core->meta_dev_idx >= 0) {
3250                if (in_core->md_size_sect != MD_128MB_SECT
3251                ||  in_core->al_offset != MD_4kB_SECT
3252                ||  in_core->bm_offset != MD_4kB_SECT + MD_32kB_SECT
3253                ||  in_core->al_stripes != 1
3254                ||  in_core->al_stripe_size_4k != MD_32kB_SECT/8)
3255                        goto err;
3256        }
3257
3258        if (capacity < in_core->md_size_sect)
3259                goto err;
3260        if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev))
3261                goto err;
3262
3263        /* should be aligned, and at least 32k */
3264        if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT))
3265                goto err;
3266
3267        /* should fit (for now: exactly) into the available on-disk space;
3268         * overflow prevention is in check_activity_log_stripe_size() above. */
3269        if (on_disk_al_sect != in_core->al_size_4k * MD_4kB_SECT)
3270                goto err;
3271
3272        /* again, should be aligned */
3273        if (in_core->bm_offset & 7)
3274                goto err;
3275
3276        /* FIXME check for device grow with flex external meta data? */
3277
3278        /* can the available bitmap space cover the last agreed device size? */
3279        if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512)
3280                goto err;
3281
3282        return 0;
3283
3284err:
3285        drbd_err(device, "meta data offsets don't make sense: idx=%d "
3286                        "al_s=%u, al_sz4k=%u, al_offset=%d, bm_offset=%d, "
3287                        "md_size_sect=%u, la_size=%llu, md_capacity=%llu\n",
3288                        in_core->meta_dev_idx,
3289                        in_core->al_stripes, in_core->al_stripe_size_4k,
3290                        in_core->al_offset, in_core->bm_offset, in_core->md_size_sect,
3291                        (unsigned long long)in_core->la_size_sect,
3292                        (unsigned long long)capacity);
3293
3294        return -EINVAL;
3295}
3296
3297
3298/**
3299 * drbd_md_read() - Reads in the meta data super block
3300 * @device:     DRBD device.
3301 * @bdev:       Device from which the meta data should be read in.
3302 *
3303 * Return NO_ERROR on success, and an enum drbd_ret_code in case
3304 * something goes wrong.
3305 *
3306 * Called exactly once during drbd_adm_attach(), while still being D_DISKLESS,
3307 * even before @bdev is assigned to @device->ldev.
3308 */
3309int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
3310{
3311        struct meta_data_on_disk *buffer;
3312        u32 magic, flags;
3313        int i, rv = NO_ERROR;
3314
3315        if (device->state.disk != D_DISKLESS)
3316                return ERR_DISK_CONFIGURED;
3317
3318        buffer = drbd_md_get_buffer(device, __func__);
3319        if (!buffer)
3320                return ERR_NOMEM;
3321
3322        /* First, figure out where our meta data superblock is located,
3323         * and read it. */
3324        bdev->md.meta_dev_idx = bdev->disk_conf->meta_dev_idx;
3325        bdev->md.md_offset = drbd_md_ss(bdev);
3326        /* Even for (flexible or indexed) external meta data,
3327         * initially restrict us to the 4k superblock for now.
3328         * Affects the paranoia out-of-range access check in drbd_md_sync_page_io(). */
3329        bdev->md.md_size_sect = 8;
3330
3331        if (drbd_md_sync_page_io(device, bdev, bdev->md.md_offset,
3332                                 REQ_OP_READ)) {
3333                /* NOTE: can't do normal error processing here as this is
3334                   called BEFORE disk is attached */
3335                drbd_err(device, "Error while reading metadata.\n");
3336                rv = ERR_IO_MD_DISK;
3337                goto err;
3338        }
3339
3340        magic = be32_to_cpu(buffer->magic);
3341        flags = be32_to_cpu(buffer->flags);
3342        if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3343            (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3344                        /* btw: that's Activity Log clean, not "all" clean. */
3345                drbd_err(device, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3346                rv = ERR_MD_UNCLEAN;
3347                goto err;
3348        }
3349
3350        rv = ERR_MD_INVALID;
3351        if (magic != DRBD_MD_MAGIC_08) {
3352                if (magic == DRBD_MD_MAGIC_07)
3353                        drbd_err(device, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3354                else
3355                        drbd_err(device, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3356                goto err;
3357        }
3358
3359        if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3360                drbd_err(device, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3361                    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3362                goto err;
3363        }
3364
3365
3366        /* convert to in_core endian */
3367        bdev->md.la_size_sect = be64_to_cpu(buffer->la_size_sect);
3368        for (i = UI_CURRENT; i < UI_SIZE; i++)
3369                bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3370        bdev->md.flags = be32_to_cpu(buffer->flags);
3371        bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3372
3373        bdev->md.md_size_sect = be32_to_cpu(buffer->md_size_sect);
3374        bdev->md.al_offset = be32_to_cpu(buffer->al_offset);
3375        bdev->md.bm_offset = be32_to_cpu(buffer->bm_offset);
3376
3377        if (check_activity_log_stripe_size(device, buffer, &bdev->md))
3378                goto err;
3379        if (check_offsets_and_sizes(device, bdev))
3380                goto err;
3381
3382        if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3383                drbd_err(device, "unexpected bm_offset: %d (expected %d)\n",
3384                    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3385                goto err;
3386        }
3387        if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3388                drbd_err(device, "unexpected md_size: %u (expected %u)\n",
3389                    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3390                goto err;
3391        }
3392
3393        rv = NO_ERROR;
3394
3395        spin_lock_irq(&device->resource->req_lock);
3396        if (device->state.conn < C_CONNECTED) {
3397                unsigned int peer;
3398                peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3399                peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
3400                device->peer_max_bio_size = peer;
3401        }
3402        spin_unlock_irq(&device->resource->req_lock);
3403
3404 err:
3405        drbd_md_put_buffer(device);
3406
3407        return rv;
3408}
3409
3410/**
3411 * drbd_md_mark_dirty() - Mark meta data super block as dirty
3412 * @device:     DRBD device.
3413 *
3414 * Call this function if you change anything that should be written to
3415 * the meta-data super block. This function sets MD_DIRTY, and starts a
3416 * timer that ensures that within five seconds you have to call drbd_md_sync().
3417 */
3418#ifdef DEBUG
3419void drbd_md_mark_dirty_(struct drbd_device *device, unsigned int line, const char *func)
3420{
3421        if (!test_and_set_bit(MD_DIRTY, &device->flags)) {
3422                mod_timer(&device->md_sync_timer, jiffies + HZ);
3423                device->last_md_mark_dirty.line = line;
3424                device->last_md_mark_dirty.func = func;
3425        }
3426}
3427#else
3428void drbd_md_mark_dirty(struct drbd_device *device)
3429{
3430        if (!test_and_set_bit(MD_DIRTY, &device->flags))
3431                mod_timer(&device->md_sync_timer, jiffies + 5*HZ);
3432}
3433#endif
3434
3435void drbd_uuid_move_history(struct drbd_device *device) __must_hold(local)
3436{
3437        int i;
3438
3439        for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3440                device->ldev->md.uuid[i+1] = device->ldev->md.uuid[i];
3441}
3442
3443void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3444{
3445        if (idx == UI_CURRENT) {
3446                if (device->state.role == R_PRIMARY)
3447                        val |= 1;
3448                else
3449                        val &= ~((u64)1);
3450
3451                drbd_set_ed_uuid(device, val);
3452        }
3453
3454        device->ldev->md.uuid[idx] = val;
3455        drbd_md_mark_dirty(device);
3456}
3457
3458void _drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3459{
3460        unsigned long flags;
3461        spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3462        __drbd_uuid_set(device, idx, val);
3463        spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3464}
3465
3466void drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
3467{
3468        unsigned long flags;
3469        spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3470        if (device->ldev->md.uuid[idx]) {
3471                drbd_uuid_move_history(device);
3472                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[idx];
3473        }
3474        __drbd_uuid_set(device, idx, val);
3475        spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3476}
3477
3478/**
3479 * drbd_uuid_new_current() - Creates a new current UUID
3480 * @device:     DRBD device.
3481 *
3482 * Creates a new current UUID, and rotates the old current UUID into
3483 * the bitmap slot. Causes an incremental resync upon next connect.
3484 */
3485void drbd_uuid_new_current(struct drbd_device *device) __must_hold(local)
3486{
3487        u64 val;
3488        unsigned long long bm_uuid;
3489
3490        get_random_bytes(&val, sizeof(u64));
3491
3492        spin_lock_irq(&device->ldev->md.uuid_lock);
3493        bm_uuid = device->ldev->md.uuid[UI_BITMAP];
3494
3495        if (bm_uuid)
3496                drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
3497
3498        device->ldev->md.uuid[UI_BITMAP] = device->ldev->md.uuid[UI_CURRENT];
3499        __drbd_uuid_set(device, UI_CURRENT, val);
3500        spin_unlock_irq(&device->ldev->md.uuid_lock);
3501
3502        drbd_print_uuids(device, "new current UUID");
3503        /* get it to stable storage _now_ */
3504        drbd_md_sync(device);
3505}
3506
3507void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
3508{
3509        unsigned long flags;
3510        if (device->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3511                return;
3512
3513        spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
3514        if (val == 0) {
3515                drbd_uuid_move_history(device);
3516                device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3517                device->ldev->md.uuid[UI_BITMAP] = 0;
3518        } else {
3519                unsigned long long bm_uuid = device->ldev->md.uuid[UI_BITMAP];
3520                if (bm_uuid)
3521                        drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
3522
3523                device->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3524        }
3525        spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
3526
3527        drbd_md_mark_dirty(device);
3528}
3529
3530/**
3531 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3532 * @device:     DRBD device.
3533 *
3534 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3535 */
3536int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local)
3537{
3538        int rv = -EIO;
3539
3540        drbd_md_set_flag(device, MDF_FULL_SYNC);
3541        drbd_md_sync(device);
3542        drbd_bm_set_all(device);
3543
3544        rv = drbd_bm_write(device);
3545
3546        if (!rv) {
3547                drbd_md_clear_flag(device, MDF_FULL_SYNC);
3548                drbd_md_sync(device);
3549        }
3550
3551        return rv;
3552}
3553
3554/**
3555 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3556 * @device:     DRBD device.
3557 *
3558 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3559 */
3560int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local)
3561{
3562        drbd_resume_al(device);
3563        drbd_bm_clear_all(device);
3564        return drbd_bm_write(device);
3565}
3566
3567static int w_bitmap_io(struct drbd_work *w, int unused)
3568{
3569        struct drbd_device *device =
3570                container_of(w, struct drbd_device, bm_io_work.w);
3571        struct bm_io_work *work = &device->bm_io_work;
3572        int rv = -EIO;
3573
3574        if (work->flags != BM_LOCKED_CHANGE_ALLOWED) {
3575                int cnt = atomic_read(&device->ap_bio_cnt);
3576                if (cnt)
3577                        drbd_err(device, "FIXME: ap_bio_cnt %d, expected 0; queued for '%s'\n",
3578                                        cnt, work->why);
3579        }
3580
3581        if (get_ldev(device)) {
3582                drbd_bm_lock(device, work->why, work->flags);
3583                rv = work->io_fn(device);
3584                drbd_bm_unlock(device);
3585                put_ldev(device);
3586        }
3587
3588        clear_bit_unlock(BITMAP_IO, &device->flags);
3589        wake_up(&device->misc_wait);
3590
3591        if (work->done)
3592                work->done(device, rv);
3593
3594        clear_bit(BITMAP_IO_QUEUED, &device->flags);
3595        work->why = NULL;
3596        work->flags = 0;
3597
3598        return 0;
3599}
3600
3601/**
3602 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3603 * @device:     DRBD device.
3604 * @io_fn:      IO callback to be called when bitmap IO is possible
3605 * @done:       callback to be called after the bitmap IO was performed
3606 * @why:        Descriptive text of the reason for doing the IO
3607 *
3608 * While IO on the bitmap happens we freeze application IO thus we ensure
3609 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3610 * called from worker context. It MUST NOT be used while a previous such
3611 * work is still pending!
3612 *
3613 * Its worker function encloses the call of io_fn() by get_ldev() and
3614 * put_ldev().
3615 */
3616void drbd_queue_bitmap_io(struct drbd_device *device,
3617                          int (*io_fn)(struct drbd_device *),
3618                          void (*done)(struct drbd_device *, int),
3619                          char *why, enum bm_flag flags)
3620{
3621        D_ASSERT(device, current == first_peer_device(device)->connection->worker.task);
3622
3623        D_ASSERT(device, !test_bit(BITMAP_IO_QUEUED, &device->flags));
3624        D_ASSERT(device, !test_bit(BITMAP_IO, &device->flags));
3625        D_ASSERT(device, list_empty(&device->bm_io_work.w.list));
3626        if (device->bm_io_work.why)
3627                drbd_err(device, "FIXME going to queue '%s' but '%s' still pending?\n",
3628                        why, device->bm_io_work.why);
3629
3630        device->bm_io_work.io_fn = io_fn;
3631        device->bm_io_work.done = done;
3632        device->bm_io_work.why = why;
3633        device->bm_io_work.flags = flags;
3634
3635        spin_lock_irq(&device->resource->req_lock);
3636        set_bit(BITMAP_IO, &device->flags);
3637        /* don't wait for pending application IO if the caller indicates that
3638         * application IO does not conflict anyways. */
3639        if (flags == BM_LOCKED_CHANGE_ALLOWED || atomic_read(&device->ap_bio_cnt) == 0) {
3640                if (!test_and_set_bit(BITMAP_IO_QUEUED, &device->flags))
3641                        drbd_queue_work(&first_peer_device(device)->connection->sender_work,
3642                                        &device->bm_io_work.w);
3643        }
3644        spin_unlock_irq(&device->resource->req_lock);
3645}
3646
3647/**
3648 * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3649 * @device:     DRBD device.
3650 * @io_fn:      IO callback to be called when bitmap IO is possible
3651 * @why:        Descriptive text of the reason for doing the IO
3652 *
3653 * freezes application IO while that the actual IO operations runs. This
3654 * functions MAY NOT be called from worker context.
3655 */
3656int drbd_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *),
3657                char *why, enum bm_flag flags)
3658{
3659        /* Only suspend io, if some operation is supposed to be locked out */
3660        const bool do_suspend_io = flags & (BM_DONT_CLEAR|BM_DONT_SET|BM_DONT_TEST);
3661        int rv;
3662
3663        D_ASSERT(device, current != first_peer_device(device)->connection->worker.task);
3664
3665        if (do_suspend_io)
3666                drbd_suspend_io(device);
3667
3668        drbd_bm_lock(device, why, flags);
3669        rv = io_fn(device);
3670        drbd_bm_unlock(device);
3671
3672        if (do_suspend_io)
3673                drbd_resume_io(device);
3674
3675        return rv;
3676}
3677
3678void drbd_md_set_flag(struct drbd_device *device, int flag) __must_hold(local)
3679{
3680        if ((device->ldev->md.flags & flag) != flag) {
3681                drbd_md_mark_dirty(device);
3682                device->ldev->md.flags |= flag;
3683        }
3684}
3685
3686void drbd_md_clear_flag(struct drbd_device *device, int flag) __must_hold(local)
3687{
3688        if ((device->ldev->md.flags & flag) != 0) {
3689                drbd_md_mark_dirty(device);
3690                device->ldev->md.flags &= ~flag;
3691        }
3692}
3693int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3694{
3695        return (bdev->md.flags & flag) != 0;
3696}
3697
3698static void md_sync_timer_fn(struct timer_list *t)
3699{
3700        struct drbd_device *device = from_timer(device, t, md_sync_timer);
3701        drbd_device_post_work(device, MD_SYNC);
3702}
3703
3704const char *cmdname(enum drbd_packet cmd)
3705{
3706        /* THINK may need to become several global tables
3707         * when we want to support more than
3708         * one PRO_VERSION */
3709        static const char *cmdnames[] = {
3710                [P_DATA]                = "Data",
3711                [P_WSAME]               = "WriteSame",
3712                [P_TRIM]                = "Trim",
3713                [P_DATA_REPLY]          = "DataReply",
3714                [P_RS_DATA_REPLY]       = "RSDataReply",
3715                [P_BARRIER]             = "Barrier",
3716                [P_BITMAP]              = "ReportBitMap",
3717                [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3718                [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3719                [P_UNPLUG_REMOTE]       = "UnplugRemote",
3720                [P_DATA_REQUEST]        = "DataRequest",
3721                [P_RS_DATA_REQUEST]     = "RSDataRequest",
3722                [P_SYNC_PARAM]          = "SyncParam",
3723                [P_SYNC_PARAM89]        = "SyncParam89",
3724                [P_PROTOCOL]            = "ReportProtocol",
3725                [P_UUIDS]               = "ReportUUIDs",
3726                [P_SIZES]               = "ReportSizes",
3727                [P_STATE]               = "ReportState",
3728                [P_SYNC_UUID]           = "ReportSyncUUID",
3729                [P_AUTH_CHALLENGE]      = "AuthChallenge",
3730                [P_AUTH_RESPONSE]       = "AuthResponse",
3731                [P_PING]                = "Ping",
3732                [P_PING_ACK]            = "PingAck",
3733                [P_RECV_ACK]            = "RecvAck",
3734                [P_WRITE_ACK]           = "WriteAck",
3735                [P_RS_WRITE_ACK]        = "RSWriteAck",
3736                [P_SUPERSEDED]          = "Superseded",
3737                [P_NEG_ACK]             = "NegAck",
3738                [P_NEG_DREPLY]          = "NegDReply",
3739                [P_NEG_RS_DREPLY]       = "NegRSDReply",
3740                [P_BARRIER_ACK]         = "BarrierAck",
3741                [P_STATE_CHG_REQ]       = "StateChgRequest",
3742                [P_STATE_CHG_REPLY]     = "StateChgReply",
3743                [P_OV_REQUEST]          = "OVRequest",
3744                [P_OV_REPLY]            = "OVReply",
3745                [P_OV_RESULT]           = "OVResult",
3746                [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3747                [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3748                [P_COMPRESSED_BITMAP]   = "CBitmap",
3749                [P_DELAY_PROBE]         = "DelayProbe",
3750                [P_OUT_OF_SYNC]         = "OutOfSync",
3751                [P_RETRY_WRITE]         = "RetryWrite",
3752                [P_RS_CANCEL]           = "RSCancel",
3753                [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3754                [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3755                [P_RETRY_WRITE]         = "retry_write",
3756                [P_PROTOCOL_UPDATE]     = "protocol_update",
3757                [P_RS_THIN_REQ]         = "rs_thin_req",
3758                [P_RS_DEALLOCATED]      = "rs_deallocated",
3759
3760                /* enum drbd_packet, but not commands - obsoleted flags:
3761                 *      P_MAY_IGNORE
3762                 *      P_MAX_OPT_CMD
3763                 */
3764        };
3765
3766        /* too big for the array: 0xfffX */
3767        if (cmd == P_INITIAL_META)
3768                return "InitialMeta";
3769        if (cmd == P_INITIAL_DATA)
3770                return "InitialData";
3771        if (cmd == P_CONNECTION_FEATURES)
3772                return "ConnectionFeatures";
3773        if (cmd >= ARRAY_SIZE(cmdnames))
3774                return "Unknown";
3775        return cmdnames[cmd];
3776}
3777
3778/**
3779 * drbd_wait_misc  -  wait for a request to make progress
3780 * @device:     device associated with the request
3781 * @i:          the struct drbd_interval embedded in struct drbd_request or
3782 *              struct drbd_peer_request
3783 */
3784int drbd_wait_misc(struct drbd_device *device, struct drbd_interval *i)
3785{
3786        struct net_conf *nc;
3787        DEFINE_WAIT(wait);
3788        long timeout;
3789
3790        rcu_read_lock();
3791        nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
3792        if (!nc) {
3793                rcu_read_unlock();
3794                return -ETIMEDOUT;
3795        }
3796        timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3797        rcu_read_unlock();
3798
3799        /* Indicate to wake up device->misc_wait on progress.  */
3800        i->waiting = true;
3801        prepare_to_wait(&device->misc_wait, &wait, TASK_INTERRUPTIBLE);
3802        spin_unlock_irq(&device->resource->req_lock);
3803        timeout = schedule_timeout(timeout);
3804        finish_wait(&device->misc_wait, &wait);
3805        spin_lock_irq(&device->resource->req_lock);
3806        if (!timeout || device->state.conn < C_CONNECTED)
3807                return -ETIMEDOUT;
3808        if (signal_pending(current))
3809                return -ERESTARTSYS;
3810        return 0;
3811}
3812
3813void lock_all_resources(void)
3814{
3815        struct drbd_resource *resource;
3816        int __maybe_unused i = 0;
3817
3818        mutex_lock(&resources_mutex);
3819        local_irq_disable();
3820        for_each_resource(resource, &drbd_resources)
3821                spin_lock_nested(&resource->req_lock, i++);
3822}
3823
3824void unlock_all_resources(void)
3825{
3826        struct drbd_resource *resource;
3827
3828        for_each_resource(resource, &drbd_resources)
3829                spin_unlock(&resource->req_lock);
3830        local_irq_enable();
3831        mutex_unlock(&resources_mutex);
3832}
3833
3834#ifdef CONFIG_DRBD_FAULT_INJECTION
3835/* Fault insertion support including random number generator shamelessly
3836 * stolen from kernel/rcutorture.c */
3837struct fault_random_state {
3838        unsigned long state;
3839        unsigned long count;
3840};
3841
3842#define FAULT_RANDOM_MULT 39916801  /* prime */
3843#define FAULT_RANDOM_ADD        479001701 /* prime */
3844#define FAULT_RANDOM_REFRESH 10000
3845
3846/*
3847 * Crude but fast random-number generator.  Uses a linear congruential
3848 * generator, with occasional help from get_random_bytes().
3849 */
3850static unsigned long
3851_drbd_fault_random(struct fault_random_state *rsp)
3852{
3853        long refresh;
3854
3855        if (!rsp->count--) {
3856                get_random_bytes(&refresh, sizeof(refresh));
3857                rsp->state += refresh;
3858                rsp->count = FAULT_RANDOM_REFRESH;
3859        }
3860        rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3861        return swahw32(rsp->state);
3862}
3863
3864static char *
3865_drbd_fault_str(unsigned int type) {
3866        static char *_faults[] = {
3867                [DRBD_FAULT_MD_WR] = "Meta-data write",
3868                [DRBD_FAULT_MD_RD] = "Meta-data read",
3869                [DRBD_FAULT_RS_WR] = "Resync write",
3870                [DRBD_FAULT_RS_RD] = "Resync read",
3871                [DRBD_FAULT_DT_WR] = "Data write",
3872                [DRBD_FAULT_DT_RD] = "Data read",
3873                [DRBD_FAULT_DT_RA] = "Data read ahead",
3874                [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3875                [DRBD_FAULT_AL_EE] = "EE allocation",
3876                [DRBD_FAULT_RECEIVE] = "receive data corruption",
3877        };
3878
3879        return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3880}
3881
3882unsigned int
3883_drbd_insert_fault(struct drbd_device *device, unsigned int type)
3884{
3885        static struct fault_random_state rrs = {0, 0};
3886
3887        unsigned int ret = (
3888                (drbd_fault_devs == 0 ||
3889                        ((1 << device_to_minor(device)) & drbd_fault_devs) != 0) &&
3890                (((_drbd_fault_random(&rrs) % 100) + 1) <= drbd_fault_rate));
3891
3892        if (ret) {
3893                drbd_fault_count++;
3894
3895                if (__ratelimit(&drbd_ratelimit_state))
3896                        drbd_warn(device, "***Simulating %s failure\n",
3897                                _drbd_fault_str(type));
3898        }
3899
3900        return ret;
3901}
3902#endif
3903
3904const char *drbd_buildtag(void)
3905{
3906        /* DRBD built from external sources has here a reference to the
3907           git hash of the source code. */
3908
3909        static char buildtag[38] = "\0uilt-in";
3910
3911        if (buildtag[0] == 0) {
3912#ifdef MODULE
3913                sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3914#else
3915                buildtag[0] = 'b';
3916#endif
3917        }
3918
3919        return buildtag;
3920}
3921
3922module_init(drbd_init)
3923module_exit(drbd_cleanup)
3924
3925EXPORT_SYMBOL(drbd_conn_str);
3926EXPORT_SYMBOL(drbd_role_str);
3927EXPORT_SYMBOL(drbd_disk_str);
3928EXPORT_SYMBOL(drbd_set_st_err_str);
3929