linux/fs/dlm/rcom.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "midcomms.h"
  17#include "rcom.h"
  18#include "recover.h"
  19#include "dir.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "util.h"
  24
  25static int rcom_response(struct dlm_ls *ls)
  26{
  27        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  28}
  29
  30static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  31                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  32{
  33        struct dlm_rcom *rc;
  34        struct dlm_mhandle *mh;
  35        char *mb;
  36        int mb_len = sizeof(struct dlm_rcom) + len;
  37
  38        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
  39        if (!mh) {
  40                log_print("create_rcom to %d type %d len %d ENOBUFS",
  41                          to_nodeid, type, len);
  42                return -ENOBUFS;
  43        }
  44
  45        rc = (struct dlm_rcom *) mb;
  46
  47        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  48        rc->rc_header.h_lockspace = ls->ls_global_id;
  49        rc->rc_header.h_nodeid = dlm_our_nodeid();
  50        rc->rc_header.h_length = mb_len;
  51        rc->rc_header.h_cmd = DLM_RCOM;
  52
  53        rc->rc_type = type;
  54
  55        spin_lock(&ls->ls_recover_lock);
  56        rc->rc_seq = ls->ls_recover_seq;
  57        spin_unlock(&ls->ls_recover_lock);
  58
  59        *mh_ret = mh;
  60        *rc_ret = rc;
  61        return 0;
  62}
  63
  64static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  65                      struct dlm_rcom *rc)
  66{
  67        dlm_rcom_out(rc);
  68        dlm_lowcomms_commit_buffer(mh);
  69}
  70
  71static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  72                            uint32_t flags)
  73{
  74        rs->rs_flags = cpu_to_le32(flags);
  75}
  76
  77/* When replying to a status request, a node also sends back its
  78   configuration values.  The requesting node then checks that the remote
  79   node is configured the same way as itself. */
  80
  81static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  82                            uint32_t num_slots)
  83{
  84        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  85        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  86
  87        rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  88        rf->rf_num_slots = cpu_to_le16(num_slots);
  89        rf->rf_generation =  cpu_to_le32(ls->ls_generation);
  90}
  91
  92static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  93{
  94        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  95
  96        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  97                log_error(ls, "version mismatch: %x nodeid %d: %x",
  98                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
  99                          rc->rc_header.h_version);
 100                return -EPROTO;
 101        }
 102
 103        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 104            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 105                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 106                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 107                          le32_to_cpu(rf->rf_lvblen),
 108                          le32_to_cpu(rf->rf_lsflags));
 109                return -EPROTO;
 110        }
 111        return 0;
 112}
 113
 114static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 115{
 116        spin_lock(&ls->ls_rcom_spin);
 117        *new_seq = ++ls->ls_rcom_seq;
 118        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 119        spin_unlock(&ls->ls_rcom_spin);
 120}
 121
 122static void disallow_sync_reply(struct dlm_ls *ls)
 123{
 124        spin_lock(&ls->ls_rcom_spin);
 125        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 126        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 127        spin_unlock(&ls->ls_rcom_spin);
 128}
 129
 130/*
 131 * low nodeid gathers one slot value at a time from each node.
 132 * it sets need_slots=0, and saves rf_our_slot returned from each
 133 * rcom_config.
 134 *
 135 * other nodes gather all slot values at once from the low nodeid.
 136 * they set need_slots=1, and ignore the rf_our_slot returned from each
 137 * rcom_config.  they use the rf_num_slots returned from the low
 138 * node's rcom_config.
 139 */
 140
 141int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 142{
 143        struct dlm_rcom *rc;
 144        struct dlm_mhandle *mh;
 145        int error = 0;
 146
 147        ls->ls_recover_nodeid = nodeid;
 148
 149        if (nodeid == dlm_our_nodeid()) {
 150                rc = ls->ls_recover_buf;
 151                rc->rc_result = dlm_recover_status(ls);
 152                goto out;
 153        }
 154
 155retry:
 156        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
 157                            sizeof(struct rcom_status), &rc, &mh);
 158        if (error)
 159                goto out;
 160
 161        set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 162
 163        allow_sync_reply(ls, &rc->rc_id);
 164        memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
 165
 166        send_rcom(ls, mh, rc);
 167
 168        error = dlm_wait_function(ls, &rcom_response);
 169        disallow_sync_reply(ls);
 170        if (error == -ETIMEDOUT)
 171                goto retry;
 172        if (error)
 173                goto out;
 174
 175        rc = ls->ls_recover_buf;
 176
 177        if (rc->rc_result == -ESRCH) {
 178                /* we pretend the remote lockspace exists with 0 status */
 179                log_debug(ls, "remote node %d not ready", nodeid);
 180                rc->rc_result = 0;
 181                error = 0;
 182        } else {
 183                error = check_rcom_config(ls, rc, nodeid);
 184        }
 185
 186        /* the caller looks at rc_result for the remote recovery status */
 187 out:
 188        return error;
 189}
 190
 191static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 192{
 193        struct dlm_rcom *rc;
 194        struct dlm_mhandle *mh;
 195        struct rcom_status *rs;
 196        uint32_t status;
 197        int nodeid = rc_in->rc_header.h_nodeid;
 198        int len = sizeof(struct rcom_config);
 199        int num_slots = 0;
 200        int error;
 201
 202        if (!dlm_slots_version(&rc_in->rc_header)) {
 203                status = dlm_recover_status(ls);
 204                goto do_create;
 205        }
 206
 207        rs = (struct rcom_status *)rc_in->rc_buf;
 208
 209        if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 210                status = dlm_recover_status(ls);
 211                goto do_create;
 212        }
 213
 214        spin_lock(&ls->ls_recover_lock);
 215        status = ls->ls_recover_status;
 216        num_slots = ls->ls_num_slots;
 217        spin_unlock(&ls->ls_recover_lock);
 218        len += num_slots * sizeof(struct rcom_slot);
 219
 220 do_create:
 221        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 222                            len, &rc, &mh);
 223        if (error)
 224                return;
 225
 226        rc->rc_id = rc_in->rc_id;
 227        rc->rc_seq_reply = rc_in->rc_seq;
 228        rc->rc_result = status;
 229
 230        set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 231
 232        if (!num_slots)
 233                goto do_send;
 234
 235        spin_lock(&ls->ls_recover_lock);
 236        if (ls->ls_num_slots != num_slots) {
 237                spin_unlock(&ls->ls_recover_lock);
 238                log_debug(ls, "receive_rcom_status num_slots %d to %d",
 239                          num_slots, ls->ls_num_slots);
 240                rc->rc_result = 0;
 241                set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 242                goto do_send;
 243        }
 244
 245        dlm_slots_copy_out(ls, rc);
 246        spin_unlock(&ls->ls_recover_lock);
 247
 248 do_send:
 249        send_rcom(ls, mh, rc);
 250}
 251
 252static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 253{
 254        spin_lock(&ls->ls_rcom_spin);
 255        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 256            rc_in->rc_id != ls->ls_rcom_seq) {
 257                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 258                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 259                          (unsigned long long)rc_in->rc_id,
 260                          (unsigned long long)ls->ls_rcom_seq);
 261                goto out;
 262        }
 263        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 264        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 265        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 266        wake_up(&ls->ls_wait_general);
 267 out:
 268        spin_unlock(&ls->ls_rcom_spin);
 269}
 270
 271int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 272{
 273        struct dlm_rcom *rc;
 274        struct dlm_mhandle *mh;
 275        int error = 0;
 276
 277        ls->ls_recover_nodeid = nodeid;
 278
 279retry:
 280        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 281        if (error)
 282                goto out;
 283        memcpy(rc->rc_buf, last_name, last_len);
 284
 285        allow_sync_reply(ls, &rc->rc_id);
 286        memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
 287
 288        send_rcom(ls, mh, rc);
 289
 290        error = dlm_wait_function(ls, &rcom_response);
 291        disallow_sync_reply(ls);
 292        if (error == -ETIMEDOUT)
 293                goto retry;
 294 out:
 295        return error;
 296}
 297
 298static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 299{
 300        struct dlm_rcom *rc;
 301        struct dlm_mhandle *mh;
 302        int error, inlen, outlen, nodeid;
 303
 304        nodeid = rc_in->rc_header.h_nodeid;
 305        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 306        outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
 307
 308        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
 309        if (error)
 310                return;
 311        rc->rc_id = rc_in->rc_id;
 312        rc->rc_seq_reply = rc_in->rc_seq;
 313
 314        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 315                              nodeid);
 316        send_rcom(ls, mh, rc);
 317}
 318
 319int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 320{
 321        struct dlm_rcom *rc;
 322        struct dlm_mhandle *mh;
 323        struct dlm_ls *ls = r->res_ls;
 324        int error;
 325
 326        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 327                            &rc, &mh);
 328        if (error)
 329                goto out;
 330        memcpy(rc->rc_buf, r->res_name, r->res_length);
 331        rc->rc_id = (unsigned long) r->res_id;
 332
 333        send_rcom(ls, mh, rc);
 334 out:
 335        return error;
 336}
 337
 338static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 339{
 340        struct dlm_rcom *rc;
 341        struct dlm_mhandle *mh;
 342        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 343        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 344
 345        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 346        if (error)
 347                return;
 348
 349        /* Old code would send this special id to trigger a debug dump. */
 350        if (rc_in->rc_id == 0xFFFFFFFF) {
 351                log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 352                dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 353                return;
 354        }
 355
 356        error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 357                                  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 358        if (error)
 359                ret_nodeid = error;
 360        rc->rc_result = ret_nodeid;
 361        rc->rc_id = rc_in->rc_id;
 362        rc->rc_seq_reply = rc_in->rc_seq;
 363
 364        send_rcom(ls, mh, rc);
 365}
 366
 367static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 368{
 369        dlm_recover_master_reply(ls, rc_in);
 370}
 371
 372static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 373                           struct rcom_lock *rl)
 374{
 375        memset(rl, 0, sizeof(*rl));
 376
 377        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 378        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 379        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 380        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 381        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 382        rl->rl_rqmode = lkb->lkb_rqmode;
 383        rl->rl_grmode = lkb->lkb_grmode;
 384        rl->rl_status = lkb->lkb_status;
 385        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 386
 387        if (lkb->lkb_bastfn)
 388                rl->rl_asts |= DLM_CB_BAST;
 389        if (lkb->lkb_astfn)
 390                rl->rl_asts |= DLM_CB_CAST;
 391
 392        rl->rl_namelen = cpu_to_le16(r->res_length);
 393        memcpy(rl->rl_name, r->res_name, r->res_length);
 394
 395        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 396           If so, receive_rcom_lock_args() won't take this copy. */
 397
 398        if (lkb->lkb_lvbptr)
 399                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 400}
 401
 402int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 403{
 404        struct dlm_ls *ls = r->res_ls;
 405        struct dlm_rcom *rc;
 406        struct dlm_mhandle *mh;
 407        struct rcom_lock *rl;
 408        int error, len = sizeof(struct rcom_lock);
 409
 410        if (lkb->lkb_lvbptr)
 411                len += ls->ls_lvblen;
 412
 413        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 414        if (error)
 415                goto out;
 416
 417        rl = (struct rcom_lock *) rc->rc_buf;
 418        pack_rcom_lock(r, lkb, rl);
 419        rc->rc_id = (unsigned long) r;
 420
 421        send_rcom(ls, mh, rc);
 422 out:
 423        return error;
 424}
 425
 426/* needs at least dlm_rcom + rcom_lock */
 427static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 428{
 429        struct dlm_rcom *rc;
 430        struct dlm_mhandle *mh;
 431        int error, nodeid = rc_in->rc_header.h_nodeid;
 432
 433        dlm_recover_master_copy(ls, rc_in);
 434
 435        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 436                            sizeof(struct rcom_lock), &rc, &mh);
 437        if (error)
 438                return;
 439
 440        /* We send back the same rcom_lock struct we received, but
 441           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 442
 443        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 444        rc->rc_id = rc_in->rc_id;
 445        rc->rc_seq_reply = rc_in->rc_seq;
 446
 447        send_rcom(ls, mh, rc);
 448}
 449
 450/* If the lockspace doesn't exist then still send a status message
 451   back; it's possible that it just doesn't have its global_id yet. */
 452
 453int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 454{
 455        struct dlm_rcom *rc;
 456        struct rcom_config *rf;
 457        struct dlm_mhandle *mh;
 458        char *mb;
 459        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 460
 461        mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
 462        if (!mh)
 463                return -ENOBUFS;
 464
 465        rc = (struct dlm_rcom *) mb;
 466
 467        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 468        rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
 469        rc->rc_header.h_nodeid = dlm_our_nodeid();
 470        rc->rc_header.h_length = mb_len;
 471        rc->rc_header.h_cmd = DLM_RCOM;
 472
 473        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 474        rc->rc_id = rc_in->rc_id;
 475        rc->rc_seq_reply = rc_in->rc_seq;
 476        rc->rc_result = -ESRCH;
 477
 478        rf = (struct rcom_config *) rc->rc_buf;
 479        rf->rf_lvblen = cpu_to_le32(~0U);
 480
 481        dlm_rcom_out(rc);
 482        dlm_lowcomms_commit_buffer(mh);
 483
 484        return 0;
 485}
 486
 487/*
 488 * Ignore messages for stage Y before we set
 489 * recover_status bit for stage X:
 490 *
 491 * recover_status = 0
 492 *
 493 * dlm_recover_members()
 494 * - send nothing
 495 * - recv nothing
 496 * - ignore NAMES, NAMES_REPLY
 497 * - ignore LOOKUP, LOOKUP_REPLY
 498 * - ignore LOCK, LOCK_REPLY
 499 *
 500 * recover_status |= NODES
 501 *
 502 * dlm_recover_members_wait()
 503 *
 504 * dlm_recover_directory()
 505 * - send NAMES
 506 * - recv NAMES_REPLY
 507 * - ignore LOOKUP, LOOKUP_REPLY
 508 * - ignore LOCK, LOCK_REPLY
 509 *
 510 * recover_status |= DIR
 511 *
 512 * dlm_recover_directory_wait()
 513 *
 514 * dlm_recover_masters()
 515 * - send LOOKUP
 516 * - recv LOOKUP_REPLY
 517 *
 518 * dlm_recover_locks()
 519 * - send LOCKS
 520 * - recv LOCKS_REPLY
 521 *
 522 * recover_status |= LOCKS
 523 *
 524 * dlm_recover_locks_wait()
 525 *
 526 * recover_status |= DONE
 527 */
 528
 529/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 530   recovery-only comms are sent through here. */
 531
 532void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 533{
 534        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 535        int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 536        uint32_t status;
 537        uint64_t seq;
 538
 539        switch (rc->rc_type) {
 540        case DLM_RCOM_STATUS_REPLY:
 541                reply = 1;
 542                break;
 543        case DLM_RCOM_NAMES:
 544                names = 1;
 545                break;
 546        case DLM_RCOM_NAMES_REPLY:
 547                names = 1;
 548                reply = 1;
 549                break;
 550        case DLM_RCOM_LOOKUP:
 551                lookup = 1;
 552                break;
 553        case DLM_RCOM_LOOKUP_REPLY:
 554                lookup = 1;
 555                reply = 1;
 556                break;
 557        case DLM_RCOM_LOCK:
 558                lock = 1;
 559                break;
 560        case DLM_RCOM_LOCK_REPLY:
 561                lock = 1;
 562                reply = 1;
 563                break;
 564        }
 565
 566        spin_lock(&ls->ls_recover_lock);
 567        status = ls->ls_recover_status;
 568        stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 569        seq = ls->ls_recover_seq;
 570        spin_unlock(&ls->ls_recover_lock);
 571
 572        if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 573                goto ignore;
 574
 575        if (reply && (rc->rc_seq_reply != seq))
 576                goto ignore;
 577
 578        if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 579                goto ignore;
 580
 581        if (!(status & DLM_RS_DIR) && (lookup || lock))
 582                goto ignore;
 583
 584        switch (rc->rc_type) {
 585        case DLM_RCOM_STATUS:
 586                receive_rcom_status(ls, rc);
 587                break;
 588
 589        case DLM_RCOM_NAMES:
 590                receive_rcom_names(ls, rc);
 591                break;
 592
 593        case DLM_RCOM_LOOKUP:
 594                receive_rcom_lookup(ls, rc);
 595                break;
 596
 597        case DLM_RCOM_LOCK:
 598                if (rc->rc_header.h_length < lock_size)
 599                        goto Eshort;
 600                receive_rcom_lock(ls, rc);
 601                break;
 602
 603        case DLM_RCOM_STATUS_REPLY:
 604                receive_sync_reply(ls, rc);
 605                break;
 606
 607        case DLM_RCOM_NAMES_REPLY:
 608                receive_sync_reply(ls, rc);
 609                break;
 610
 611        case DLM_RCOM_LOOKUP_REPLY:
 612                receive_rcom_lookup_reply(ls, rc);
 613                break;
 614
 615        case DLM_RCOM_LOCK_REPLY:
 616                if (rc->rc_header.h_length < lock_size)
 617                        goto Eshort;
 618                dlm_recover_process_copy(ls, rc);
 619                break;
 620
 621        default:
 622                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 623        }
 624        return;
 625
 626ignore:
 627        log_limit(ls, "dlm_receive_rcom ignore msg %d "
 628                  "from %d %llu %llu recover seq %llu sts %x gen %u",
 629                   rc->rc_type,
 630                   nodeid,
 631                   (unsigned long long)rc->rc_seq,
 632                   (unsigned long long)rc->rc_seq_reply,
 633                   (unsigned long long)seq,
 634                   status, ls->ls_generation);
 635        return;
 636Eshort:
 637        log_error(ls, "recovery message %d from %d is too short",
 638                  rc->rc_type, nodeid);
 639}
 640
 641