linux/fs/dlm/rcom.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "midcomms.h"
  17#include "rcom.h"
  18#include "recover.h"
  19#include "dir.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "util.h"
  24
  25static int rcom_response(struct dlm_ls *ls)
  26{
  27        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  28}
  29
  30static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  31                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  32{
  33        struct dlm_rcom *rc;
  34        struct dlm_mhandle *mh;
  35        char *mb;
  36        int mb_len = sizeof(struct dlm_rcom) + len;
  37
  38        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
  39        if (!mh) {
  40                log_print("create_rcom to %d type %d len %d ENOBUFS",
  41                          to_nodeid, type, len);
  42                return -ENOBUFS;
  43        }
  44        memset(mb, 0, mb_len);
  45
  46        rc = (struct dlm_rcom *) mb;
  47
  48        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  49        rc->rc_header.h_lockspace = ls->ls_global_id;
  50        rc->rc_header.h_nodeid = dlm_our_nodeid();
  51        rc->rc_header.h_length = mb_len;
  52        rc->rc_header.h_cmd = DLM_RCOM;
  53
  54        rc->rc_type = type;
  55
  56        spin_lock(&ls->ls_recover_lock);
  57        rc->rc_seq = ls->ls_recover_seq;
  58        spin_unlock(&ls->ls_recover_lock);
  59
  60        *mh_ret = mh;
  61        *rc_ret = rc;
  62        return 0;
  63}
  64
  65static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  66                      struct dlm_rcom *rc)
  67{
  68        dlm_rcom_out(rc);
  69        dlm_lowcomms_commit_buffer(mh);
  70}
  71
  72static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  73                            uint32_t flags)
  74{
  75        rs->rs_flags = cpu_to_le32(flags);
  76}
  77
  78/* When replying to a status request, a node also sends back its
  79   configuration values.  The requesting node then checks that the remote
  80   node is configured the same way as itself. */
  81
  82static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  83                            uint32_t num_slots)
  84{
  85        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  86        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  87
  88        rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  89        rf->rf_num_slots = cpu_to_le16(num_slots);
  90        rf->rf_generation =  cpu_to_le32(ls->ls_generation);
  91}
  92
  93static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  94{
  95        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  96
  97        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  98                log_error(ls, "version mismatch: %x nodeid %d: %x",
  99                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
 100                          rc->rc_header.h_version);
 101                return -EPROTO;
 102        }
 103
 104        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 105            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 106                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 107                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 108                          le32_to_cpu(rf->rf_lvblen),
 109                          le32_to_cpu(rf->rf_lsflags));
 110                return -EPROTO;
 111        }
 112        return 0;
 113}
 114
 115static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 116{
 117        spin_lock(&ls->ls_rcom_spin);
 118        *new_seq = ++ls->ls_rcom_seq;
 119        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 120        spin_unlock(&ls->ls_rcom_spin);
 121}
 122
 123static void disallow_sync_reply(struct dlm_ls *ls)
 124{
 125        spin_lock(&ls->ls_rcom_spin);
 126        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 127        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 128        spin_unlock(&ls->ls_rcom_spin);
 129}
 130
 131/*
 132 * low nodeid gathers one slot value at a time from each node.
 133 * it sets need_slots=0, and saves rf_our_slot returned from each
 134 * rcom_config.
 135 *
 136 * other nodes gather all slot values at once from the low nodeid.
 137 * they set need_slots=1, and ignore the rf_our_slot returned from each
 138 * rcom_config.  they use the rf_num_slots returned from the low
 139 * node's rcom_config.
 140 */
 141
 142int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 143{
 144        struct dlm_rcom *rc;
 145        struct dlm_mhandle *mh;
 146        int error = 0;
 147
 148        ls->ls_recover_nodeid = nodeid;
 149
 150        if (nodeid == dlm_our_nodeid()) {
 151                rc = ls->ls_recover_buf;
 152                rc->rc_result = dlm_recover_status(ls);
 153                goto out;
 154        }
 155
 156retry:
 157        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
 158                            sizeof(struct rcom_status), &rc, &mh);
 159        if (error)
 160                goto out;
 161
 162        set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 163
 164        allow_sync_reply(ls, &rc->rc_id);
 165        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 166
 167        send_rcom(ls, mh, rc);
 168
 169        error = dlm_wait_function(ls, &rcom_response);
 170        disallow_sync_reply(ls);
 171        if (error == -ETIMEDOUT)
 172                goto retry;
 173        if (error)
 174                goto out;
 175
 176        rc = ls->ls_recover_buf;
 177
 178        if (rc->rc_result == -ESRCH) {
 179                /* we pretend the remote lockspace exists with 0 status */
 180                log_debug(ls, "remote node %d not ready", nodeid);
 181                rc->rc_result = 0;
 182                error = 0;
 183        } else {
 184                error = check_rcom_config(ls, rc, nodeid);
 185        }
 186
 187        /* the caller looks at rc_result for the remote recovery status */
 188 out:
 189        return error;
 190}
 191
 192static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 193{
 194        struct dlm_rcom *rc;
 195        struct dlm_mhandle *mh;
 196        struct rcom_status *rs;
 197        uint32_t status;
 198        int nodeid = rc_in->rc_header.h_nodeid;
 199        int len = sizeof(struct rcom_config);
 200        int num_slots = 0;
 201        int error;
 202
 203        if (!dlm_slots_version(&rc_in->rc_header)) {
 204                status = dlm_recover_status(ls);
 205                goto do_create;
 206        }
 207
 208        rs = (struct rcom_status *)rc_in->rc_buf;
 209
 210        if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 211                status = dlm_recover_status(ls);
 212                goto do_create;
 213        }
 214
 215        spin_lock(&ls->ls_recover_lock);
 216        status = ls->ls_recover_status;
 217        num_slots = ls->ls_num_slots;
 218        spin_unlock(&ls->ls_recover_lock);
 219        len += num_slots * sizeof(struct rcom_slot);
 220
 221 do_create:
 222        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 223                            len, &rc, &mh);
 224        if (error)
 225                return;
 226
 227        rc->rc_id = rc_in->rc_id;
 228        rc->rc_seq_reply = rc_in->rc_seq;
 229        rc->rc_result = status;
 230
 231        set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 232
 233        if (!num_slots)
 234                goto do_send;
 235
 236        spin_lock(&ls->ls_recover_lock);
 237        if (ls->ls_num_slots != num_slots) {
 238                spin_unlock(&ls->ls_recover_lock);
 239                log_debug(ls, "receive_rcom_status num_slots %d to %d",
 240                          num_slots, ls->ls_num_slots);
 241                rc->rc_result = 0;
 242                set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 243                goto do_send;
 244        }
 245
 246        dlm_slots_copy_out(ls, rc);
 247        spin_unlock(&ls->ls_recover_lock);
 248
 249 do_send:
 250        send_rcom(ls, mh, rc);
 251}
 252
 253static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 254{
 255        spin_lock(&ls->ls_rcom_spin);
 256        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 257            rc_in->rc_id != ls->ls_rcom_seq) {
 258                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 259                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 260                          (unsigned long long)rc_in->rc_id,
 261                          (unsigned long long)ls->ls_rcom_seq);
 262                goto out;
 263        }
 264        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 265        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 266        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 267        wake_up(&ls->ls_wait_general);
 268 out:
 269        spin_unlock(&ls->ls_rcom_spin);
 270}
 271
 272int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 273{
 274        struct dlm_rcom *rc;
 275        struct dlm_mhandle *mh;
 276        int error = 0;
 277
 278        ls->ls_recover_nodeid = nodeid;
 279
 280retry:
 281        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 282        if (error)
 283                goto out;
 284        memcpy(rc->rc_buf, last_name, last_len);
 285
 286        allow_sync_reply(ls, &rc->rc_id);
 287        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 288
 289        send_rcom(ls, mh, rc);
 290
 291        error = dlm_wait_function(ls, &rcom_response);
 292        disallow_sync_reply(ls);
 293        if (error == -ETIMEDOUT)
 294                goto retry;
 295 out:
 296        return error;
 297}
 298
 299static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 300{
 301        struct dlm_rcom *rc;
 302        struct dlm_mhandle *mh;
 303        int error, inlen, outlen, nodeid;
 304
 305        nodeid = rc_in->rc_header.h_nodeid;
 306        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 307        outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 308
 309        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
 310        if (error)
 311                return;
 312        rc->rc_id = rc_in->rc_id;
 313        rc->rc_seq_reply = rc_in->rc_seq;
 314
 315        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 316                              nodeid);
 317        send_rcom(ls, mh, rc);
 318}
 319
 320int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 321{
 322        struct dlm_rcom *rc;
 323        struct dlm_mhandle *mh;
 324        struct dlm_ls *ls = r->res_ls;
 325        int error;
 326
 327        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 328                            &rc, &mh);
 329        if (error)
 330                goto out;
 331        memcpy(rc->rc_buf, r->res_name, r->res_length);
 332        rc->rc_id = (unsigned long) r->res_id;
 333
 334        send_rcom(ls, mh, rc);
 335 out:
 336        return error;
 337}
 338
 339static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 340{
 341        struct dlm_rcom *rc;
 342        struct dlm_mhandle *mh;
 343        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 344        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 345
 346        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 347        if (error)
 348                return;
 349
 350        /* Old code would send this special id to trigger a debug dump. */
 351        if (rc_in->rc_id == 0xFFFFFFFF) {
 352                log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 353                dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 354                return;
 355        }
 356
 357        error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 358                                  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 359        if (error)
 360                ret_nodeid = error;
 361        rc->rc_result = ret_nodeid;
 362        rc->rc_id = rc_in->rc_id;
 363        rc->rc_seq_reply = rc_in->rc_seq;
 364
 365        send_rcom(ls, mh, rc);
 366}
 367
 368static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 369{
 370        dlm_recover_master_reply(ls, rc_in);
 371}
 372
 373static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 374                           struct rcom_lock *rl)
 375{
 376        memset(rl, 0, sizeof(*rl));
 377
 378        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 379        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 380        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 381        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 382        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 383        rl->rl_rqmode = lkb->lkb_rqmode;
 384        rl->rl_grmode = lkb->lkb_grmode;
 385        rl->rl_status = lkb->lkb_status;
 386        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 387
 388        if (lkb->lkb_bastfn)
 389                rl->rl_asts |= DLM_CB_BAST;
 390        if (lkb->lkb_astfn)
 391                rl->rl_asts |= DLM_CB_CAST;
 392
 393        rl->rl_namelen = cpu_to_le16(r->res_length);
 394        memcpy(rl->rl_name, r->res_name, r->res_length);
 395
 396        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 397           If so, receive_rcom_lock_args() won't take this copy. */
 398
 399        if (lkb->lkb_lvbptr)
 400                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 401}
 402
 403int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 404{
 405        struct dlm_ls *ls = r->res_ls;
 406        struct dlm_rcom *rc;
 407        struct dlm_mhandle *mh;
 408        struct rcom_lock *rl;
 409        int error, len = sizeof(struct rcom_lock);
 410
 411        if (lkb->lkb_lvbptr)
 412                len += ls->ls_lvblen;
 413
 414        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 415        if (error)
 416                goto out;
 417
 418        rl = (struct rcom_lock *) rc->rc_buf;
 419        pack_rcom_lock(r, lkb, rl);
 420        rc->rc_id = (unsigned long) r;
 421
 422        send_rcom(ls, mh, rc);
 423 out:
 424        return error;
 425}
 426
 427/* needs at least dlm_rcom + rcom_lock */
 428static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 429{
 430        struct dlm_rcom *rc;
 431        struct dlm_mhandle *mh;
 432        int error, nodeid = rc_in->rc_header.h_nodeid;
 433
 434        dlm_recover_master_copy(ls, rc_in);
 435
 436        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 437                            sizeof(struct rcom_lock), &rc, &mh);
 438        if (error)
 439                return;
 440
 441        /* We send back the same rcom_lock struct we received, but
 442           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 443
 444        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 445        rc->rc_id = rc_in->rc_id;
 446        rc->rc_seq_reply = rc_in->rc_seq;
 447
 448        send_rcom(ls, mh, rc);
 449}
 450
 451/* If the lockspace doesn't exist then still send a status message
 452   back; it's possible that it just doesn't have its global_id yet. */
 453
 454int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 455{
 456        struct dlm_rcom *rc;
 457        struct rcom_config *rf;
 458        struct dlm_mhandle *mh;
 459        char *mb;
 460        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 461
 462        mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
 463        if (!mh)
 464                return -ENOBUFS;
 465        memset(mb, 0, mb_len);
 466
 467        rc = (struct dlm_rcom *) mb;
 468
 469        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 470        rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
 471        rc->rc_header.h_nodeid = dlm_our_nodeid();
 472        rc->rc_header.h_length = mb_len;
 473        rc->rc_header.h_cmd = DLM_RCOM;
 474
 475        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 476        rc->rc_id = rc_in->rc_id;
 477        rc->rc_seq_reply = rc_in->rc_seq;
 478        rc->rc_result = -ESRCH;
 479
 480        rf = (struct rcom_config *) rc->rc_buf;
 481        rf->rf_lvblen = cpu_to_le32(~0U);
 482
 483        dlm_rcom_out(rc);
 484        dlm_lowcomms_commit_buffer(mh);
 485
 486        return 0;
 487}
 488
 489/*
 490 * Ignore messages for stage Y before we set
 491 * recover_status bit for stage X:
 492 *
 493 * recover_status = 0
 494 *
 495 * dlm_recover_members()
 496 * - send nothing
 497 * - recv nothing
 498 * - ignore NAMES, NAMES_REPLY
 499 * - ignore LOOKUP, LOOKUP_REPLY
 500 * - ignore LOCK, LOCK_REPLY
 501 *
 502 * recover_status |= NODES
 503 *
 504 * dlm_recover_members_wait()
 505 *
 506 * dlm_recover_directory()
 507 * - send NAMES
 508 * - recv NAMES_REPLY
 509 * - ignore LOOKUP, LOOKUP_REPLY
 510 * - ignore LOCK, LOCK_REPLY
 511 *
 512 * recover_status |= DIR
 513 *
 514 * dlm_recover_directory_wait()
 515 *
 516 * dlm_recover_masters()
 517 * - send LOOKUP
 518 * - recv LOOKUP_REPLY
 519 *
 520 * dlm_recover_locks()
 521 * - send LOCKS
 522 * - recv LOCKS_REPLY
 523 *
 524 * recover_status |= LOCKS
 525 *
 526 * dlm_recover_locks_wait()
 527 *
 528 * recover_status |= DONE
 529 */
 530
 531/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 532   recovery-only comms are sent through here. */
 533
 534void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 535{
 536        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 537        int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 538        uint32_t status;
 539        uint64_t seq;
 540
 541        switch (rc->rc_type) {
 542        case DLM_RCOM_STATUS_REPLY:
 543                reply = 1;
 544                break;
 545        case DLM_RCOM_NAMES:
 546                names = 1;
 547                break;
 548        case DLM_RCOM_NAMES_REPLY:
 549                names = 1;
 550                reply = 1;
 551                break;
 552        case DLM_RCOM_LOOKUP:
 553                lookup = 1;
 554                break;
 555        case DLM_RCOM_LOOKUP_REPLY:
 556                lookup = 1;
 557                reply = 1;
 558                break;
 559        case DLM_RCOM_LOCK:
 560                lock = 1;
 561                break;
 562        case DLM_RCOM_LOCK_REPLY:
 563                lock = 1;
 564                reply = 1;
 565                break;
 566        };
 567
 568        spin_lock(&ls->ls_recover_lock);
 569        status = ls->ls_recover_status;
 570        stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 571        seq = ls->ls_recover_seq;
 572        spin_unlock(&ls->ls_recover_lock);
 573
 574        if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 575                goto ignore;
 576
 577        if (reply && (rc->rc_seq_reply != seq))
 578                goto ignore;
 579
 580        if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 581                goto ignore;
 582
 583        if (!(status & DLM_RS_DIR) && (lookup || lock))
 584                goto ignore;
 585
 586        switch (rc->rc_type) {
 587        case DLM_RCOM_STATUS:
 588                receive_rcom_status(ls, rc);
 589                break;
 590
 591        case DLM_RCOM_NAMES:
 592                receive_rcom_names(ls, rc);
 593                break;
 594
 595        case DLM_RCOM_LOOKUP:
 596                receive_rcom_lookup(ls, rc);
 597                break;
 598
 599        case DLM_RCOM_LOCK:
 600                if (rc->rc_header.h_length < lock_size)
 601                        goto Eshort;
 602                receive_rcom_lock(ls, rc);
 603                break;
 604
 605        case DLM_RCOM_STATUS_REPLY:
 606                receive_sync_reply(ls, rc);
 607                break;
 608
 609        case DLM_RCOM_NAMES_REPLY:
 610                receive_sync_reply(ls, rc);
 611                break;
 612
 613        case DLM_RCOM_LOOKUP_REPLY:
 614                receive_rcom_lookup_reply(ls, rc);
 615                break;
 616
 617        case DLM_RCOM_LOCK_REPLY:
 618                if (rc->rc_header.h_length < lock_size)
 619                        goto Eshort;
 620                dlm_recover_process_copy(ls, rc);
 621                break;
 622
 623        default:
 624                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 625        }
 626        return;
 627
 628ignore:
 629        log_limit(ls, "dlm_receive_rcom ignore msg %d "
 630                  "from %d %llu %llu recover seq %llu sts %x gen %u",
 631                   rc->rc_type,
 632                   nodeid,
 633                   (unsigned long long)rc->rc_seq,
 634                   (unsigned long long)rc->rc_seq_reply,
 635                   (unsigned long long)seq,
 636                   status, ls->ls_generation);
 637        return;
 638Eshort:
 639        log_error(ls, "recovery message %d from %d is too short",
 640                  rc->rc_type, nodeid);
 641}
 642
 643