linux/fs/dlm/rcom.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12#include "dlm_internal.h"
  13#include "lockspace.h"
  14#include "member.h"
  15#include "lowcomms.h"
  16#include "midcomms.h"
  17#include "rcom.h"
  18#include "recover.h"
  19#include "dir.h"
  20#include "config.h"
  21#include "memory.h"
  22#include "lock.h"
  23#include "util.h"
  24
  25static int rcom_response(struct dlm_ls *ls)
  26{
  27        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  28}
  29
  30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  31                         struct dlm_rcom **rc_ret, char *mb, int mb_len)
  32{
  33        struct dlm_rcom *rc;
  34
  35        rc = (struct dlm_rcom *) mb;
  36
  37        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  38        rc->rc_header.u.h_lockspace = ls->ls_global_id;
  39        rc->rc_header.h_nodeid = dlm_our_nodeid();
  40        rc->rc_header.h_length = mb_len;
  41        rc->rc_header.h_cmd = DLM_RCOM;
  42
  43        rc->rc_type = type;
  44
  45        spin_lock(&ls->ls_recover_lock);
  46        rc->rc_seq = ls->ls_recover_seq;
  47        spin_unlock(&ls->ls_recover_lock);
  48
  49        *rc_ret = rc;
  50}
  51
  52static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  53                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  54{
  55        int mb_len = sizeof(struct dlm_rcom) + len;
  56        struct dlm_mhandle *mh;
  57        char *mb;
  58
  59        mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
  60        if (!mh) {
  61                log_print("%s to %d type %d len %d ENOBUFS",
  62                          __func__, to_nodeid, type, len);
  63                return -ENOBUFS;
  64        }
  65
  66        _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  67        *mh_ret = mh;
  68        return 0;
  69}
  70
  71static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
  72                                 int len, struct dlm_rcom **rc_ret,
  73                                 struct dlm_msg **msg_ret)
  74{
  75        int mb_len = sizeof(struct dlm_rcom) + len;
  76        struct dlm_msg *msg;
  77        char *mb;
  78
  79        msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
  80                                   NULL, NULL);
  81        if (!msg) {
  82                log_print("create_rcom to %d type %d len %d ENOBUFS",
  83                          to_nodeid, type, len);
  84                return -ENOBUFS;
  85        }
  86
  87        _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  88        *msg_ret = msg;
  89        return 0;
  90}
  91
  92static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
  93{
  94        dlm_rcom_out(rc);
  95        dlm_midcomms_commit_mhandle(mh);
  96}
  97
  98static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
  99{
 100        dlm_rcom_out(rc);
 101        dlm_lowcomms_commit_msg(msg);
 102        dlm_lowcomms_put_msg(msg);
 103}
 104
 105static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
 106                            uint32_t flags)
 107{
 108        rs->rs_flags = cpu_to_le32(flags);
 109}
 110
 111/* When replying to a status request, a node also sends back its
 112   configuration values.  The requesting node then checks that the remote
 113   node is configured the same way as itself. */
 114
 115static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
 116                            uint32_t num_slots)
 117{
 118        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
 119        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
 120
 121        rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
 122        rf->rf_num_slots = cpu_to_le16(num_slots);
 123        rf->rf_generation =  cpu_to_le32(ls->ls_generation);
 124}
 125
 126static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 127{
 128        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
 129
 130        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
 131                log_error(ls, "version mismatch: %x nodeid %d: %x",
 132                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
 133                          rc->rc_header.h_version);
 134                return -EPROTO;
 135        }
 136
 137        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 138            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 139                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 140                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 141                          le32_to_cpu(rf->rf_lvblen),
 142                          le32_to_cpu(rf->rf_lsflags));
 143                return -EPROTO;
 144        }
 145        return 0;
 146}
 147
 148static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 149{
 150        spin_lock(&ls->ls_rcom_spin);
 151        *new_seq = ++ls->ls_rcom_seq;
 152        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 153        spin_unlock(&ls->ls_rcom_spin);
 154}
 155
 156static void disallow_sync_reply(struct dlm_ls *ls)
 157{
 158        spin_lock(&ls->ls_rcom_spin);
 159        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 160        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 161        spin_unlock(&ls->ls_rcom_spin);
 162}
 163
 164/*
 165 * low nodeid gathers one slot value at a time from each node.
 166 * it sets need_slots=0, and saves rf_our_slot returned from each
 167 * rcom_config.
 168 *
 169 * other nodes gather all slot values at once from the low nodeid.
 170 * they set need_slots=1, and ignore the rf_our_slot returned from each
 171 * rcom_config.  they use the rf_num_slots returned from the low
 172 * node's rcom_config.
 173 */
 174
 175int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 176{
 177        struct dlm_rcom *rc;
 178        struct dlm_msg *msg;
 179        int error = 0;
 180
 181        ls->ls_recover_nodeid = nodeid;
 182
 183        if (nodeid == dlm_our_nodeid()) {
 184                rc = ls->ls_recover_buf;
 185                rc->rc_result = dlm_recover_status(ls);
 186                goto out;
 187        }
 188
 189retry:
 190        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
 191                                      sizeof(struct rcom_status), &rc, &msg);
 192        if (error)
 193                goto out;
 194
 195        set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 196
 197        allow_sync_reply(ls, &rc->rc_id);
 198        memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
 199
 200        send_rcom_stateless(msg, rc);
 201
 202        error = dlm_wait_function(ls, &rcom_response);
 203        disallow_sync_reply(ls);
 204        if (error == -ETIMEDOUT)
 205                goto retry;
 206        if (error)
 207                goto out;
 208
 209        rc = ls->ls_recover_buf;
 210
 211        if (rc->rc_result == -ESRCH) {
 212                /* we pretend the remote lockspace exists with 0 status */
 213                log_debug(ls, "remote node %d not ready", nodeid);
 214                rc->rc_result = 0;
 215                error = 0;
 216        } else {
 217                error = check_rcom_config(ls, rc, nodeid);
 218        }
 219
 220        /* the caller looks at rc_result for the remote recovery status */
 221 out:
 222        return error;
 223}
 224
 225static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 226{
 227        struct dlm_rcom *rc;
 228        struct rcom_status *rs;
 229        uint32_t status;
 230        int nodeid = rc_in->rc_header.h_nodeid;
 231        int len = sizeof(struct rcom_config);
 232        struct dlm_msg *msg;
 233        int num_slots = 0;
 234        int error;
 235
 236        if (!dlm_slots_version(&rc_in->rc_header)) {
 237                status = dlm_recover_status(ls);
 238                goto do_create;
 239        }
 240
 241        rs = (struct rcom_status *)rc_in->rc_buf;
 242
 243        if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 244                status = dlm_recover_status(ls);
 245                goto do_create;
 246        }
 247
 248        spin_lock(&ls->ls_recover_lock);
 249        status = ls->ls_recover_status;
 250        num_slots = ls->ls_num_slots;
 251        spin_unlock(&ls->ls_recover_lock);
 252        len += num_slots * sizeof(struct rcom_slot);
 253
 254 do_create:
 255        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 256                                      len, &rc, &msg);
 257        if (error)
 258                return;
 259
 260        rc->rc_id = rc_in->rc_id;
 261        rc->rc_seq_reply = rc_in->rc_seq;
 262        rc->rc_result = status;
 263
 264        set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 265
 266        if (!num_slots)
 267                goto do_send;
 268
 269        spin_lock(&ls->ls_recover_lock);
 270        if (ls->ls_num_slots != num_slots) {
 271                spin_unlock(&ls->ls_recover_lock);
 272                log_debug(ls, "receive_rcom_status num_slots %d to %d",
 273                          num_slots, ls->ls_num_slots);
 274                rc->rc_result = 0;
 275                set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 276                goto do_send;
 277        }
 278
 279        dlm_slots_copy_out(ls, rc);
 280        spin_unlock(&ls->ls_recover_lock);
 281
 282 do_send:
 283        send_rcom_stateless(msg, rc);
 284}
 285
 286static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 287{
 288        spin_lock(&ls->ls_rcom_spin);
 289        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 290            rc_in->rc_id != ls->ls_rcom_seq) {
 291                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 292                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 293                          (unsigned long long)rc_in->rc_id,
 294                          (unsigned long long)ls->ls_rcom_seq);
 295                goto out;
 296        }
 297        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 298        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 299        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 300        wake_up(&ls->ls_wait_general);
 301 out:
 302        spin_unlock(&ls->ls_rcom_spin);
 303}
 304
 305int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 306{
 307        struct dlm_rcom *rc;
 308        struct dlm_msg *msg;
 309        int error = 0;
 310
 311        ls->ls_recover_nodeid = nodeid;
 312
 313retry:
 314        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
 315                                      &rc, &msg);
 316        if (error)
 317                goto out;
 318        memcpy(rc->rc_buf, last_name, last_len);
 319
 320        allow_sync_reply(ls, &rc->rc_id);
 321        memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
 322
 323        send_rcom_stateless(msg, rc);
 324
 325        error = dlm_wait_function(ls, &rcom_response);
 326        disallow_sync_reply(ls);
 327        if (error == -ETIMEDOUT)
 328                goto retry;
 329 out:
 330        return error;
 331}
 332
 333static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 334{
 335        struct dlm_rcom *rc;
 336        int error, inlen, outlen, nodeid;
 337        struct dlm_msg *msg;
 338
 339        nodeid = rc_in->rc_header.h_nodeid;
 340        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 341        outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
 342
 343        error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
 344                                      &rc, &msg);
 345        if (error)
 346                return;
 347        rc->rc_id = rc_in->rc_id;
 348        rc->rc_seq_reply = rc_in->rc_seq;
 349
 350        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 351                              nodeid);
 352        send_rcom_stateless(msg, rc);
 353}
 354
 355int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 356{
 357        struct dlm_rcom *rc;
 358        struct dlm_mhandle *mh;
 359        struct dlm_ls *ls = r->res_ls;
 360        int error;
 361
 362        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 363                            &rc, &mh);
 364        if (error)
 365                goto out;
 366        memcpy(rc->rc_buf, r->res_name, r->res_length);
 367        rc->rc_id = (unsigned long) r->res_id;
 368
 369        send_rcom(mh, rc);
 370 out:
 371        return error;
 372}
 373
 374static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 375{
 376        struct dlm_rcom *rc;
 377        struct dlm_mhandle *mh;
 378        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 379        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 380
 381        /* Old code would send this special id to trigger a debug dump. */
 382        if (rc_in->rc_id == 0xFFFFFFFF) {
 383                log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 384                dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 385                return;
 386        }
 387
 388        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 389        if (error)
 390                return;
 391
 392        error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 393                                  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 394        if (error)
 395                ret_nodeid = error;
 396        rc->rc_result = ret_nodeid;
 397        rc->rc_id = rc_in->rc_id;
 398        rc->rc_seq_reply = rc_in->rc_seq;
 399
 400        send_rcom(mh, rc);
 401}
 402
 403static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 404{
 405        dlm_recover_master_reply(ls, rc_in);
 406}
 407
 408static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 409                           struct rcom_lock *rl)
 410{
 411        memset(rl, 0, sizeof(*rl));
 412
 413        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 414        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 415        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 416        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 417        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 418        rl->rl_rqmode = lkb->lkb_rqmode;
 419        rl->rl_grmode = lkb->lkb_grmode;
 420        rl->rl_status = lkb->lkb_status;
 421        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 422
 423        if (lkb->lkb_bastfn)
 424                rl->rl_asts |= DLM_CB_BAST;
 425        if (lkb->lkb_astfn)
 426                rl->rl_asts |= DLM_CB_CAST;
 427
 428        rl->rl_namelen = cpu_to_le16(r->res_length);
 429        memcpy(rl->rl_name, r->res_name, r->res_length);
 430
 431        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 432           If so, receive_rcom_lock_args() won't take this copy. */
 433
 434        if (lkb->lkb_lvbptr)
 435                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 436}
 437
 438int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 439{
 440        struct dlm_ls *ls = r->res_ls;
 441        struct dlm_rcom *rc;
 442        struct dlm_mhandle *mh;
 443        struct rcom_lock *rl;
 444        int error, len = sizeof(struct rcom_lock);
 445
 446        if (lkb->lkb_lvbptr)
 447                len += ls->ls_lvblen;
 448
 449        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 450        if (error)
 451                goto out;
 452
 453        rl = (struct rcom_lock *) rc->rc_buf;
 454        pack_rcom_lock(r, lkb, rl);
 455        rc->rc_id = (unsigned long) r;
 456
 457        send_rcom(mh, rc);
 458 out:
 459        return error;
 460}
 461
 462/* needs at least dlm_rcom + rcom_lock */
 463static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 464{
 465        struct dlm_rcom *rc;
 466        struct dlm_mhandle *mh;
 467        int error, nodeid = rc_in->rc_header.h_nodeid;
 468
 469        dlm_recover_master_copy(ls, rc_in);
 470
 471        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 472                            sizeof(struct rcom_lock), &rc, &mh);
 473        if (error)
 474                return;
 475
 476        /* We send back the same rcom_lock struct we received, but
 477           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 478
 479        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 480        rc->rc_id = rc_in->rc_id;
 481        rc->rc_seq_reply = rc_in->rc_seq;
 482
 483        send_rcom(mh, rc);
 484}
 485
 486/* If the lockspace doesn't exist then still send a status message
 487   back; it's possible that it just doesn't have its global_id yet. */
 488
 489int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 490{
 491        struct dlm_rcom *rc;
 492        struct rcom_config *rf;
 493        struct dlm_mhandle *mh;
 494        char *mb;
 495        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 496
 497        mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
 498        if (!mh)
 499                return -ENOBUFS;
 500
 501        rc = (struct dlm_rcom *) mb;
 502
 503        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 504        rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
 505        rc->rc_header.h_nodeid = dlm_our_nodeid();
 506        rc->rc_header.h_length = mb_len;
 507        rc->rc_header.h_cmd = DLM_RCOM;
 508
 509        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 510        rc->rc_id = rc_in->rc_id;
 511        rc->rc_seq_reply = rc_in->rc_seq;
 512        rc->rc_result = -ESRCH;
 513
 514        rf = (struct rcom_config *) rc->rc_buf;
 515        rf->rf_lvblen = cpu_to_le32(~0U);
 516
 517        dlm_rcom_out(rc);
 518        dlm_midcomms_commit_mhandle(mh);
 519
 520        return 0;
 521}
 522
 523/*
 524 * Ignore messages for stage Y before we set
 525 * recover_status bit for stage X:
 526 *
 527 * recover_status = 0
 528 *
 529 * dlm_recover_members()
 530 * - send nothing
 531 * - recv nothing
 532 * - ignore NAMES, NAMES_REPLY
 533 * - ignore LOOKUP, LOOKUP_REPLY
 534 * - ignore LOCK, LOCK_REPLY
 535 *
 536 * recover_status |= NODES
 537 *
 538 * dlm_recover_members_wait()
 539 *
 540 * dlm_recover_directory()
 541 * - send NAMES
 542 * - recv NAMES_REPLY
 543 * - ignore LOOKUP, LOOKUP_REPLY
 544 * - ignore LOCK, LOCK_REPLY
 545 *
 546 * recover_status |= DIR
 547 *
 548 * dlm_recover_directory_wait()
 549 *
 550 * dlm_recover_masters()
 551 * - send LOOKUP
 552 * - recv LOOKUP_REPLY
 553 *
 554 * dlm_recover_locks()
 555 * - send LOCKS
 556 * - recv LOCKS_REPLY
 557 *
 558 * recover_status |= LOCKS
 559 *
 560 * dlm_recover_locks_wait()
 561 *
 562 * recover_status |= DONE
 563 */
 564
 565/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 566   recovery-only comms are sent through here. */
 567
 568void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 569{
 570        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 571        int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 572        uint32_t status;
 573        uint64_t seq;
 574
 575        switch (rc->rc_type) {
 576        case DLM_RCOM_STATUS_REPLY:
 577                reply = 1;
 578                break;
 579        case DLM_RCOM_NAMES:
 580                names = 1;
 581                break;
 582        case DLM_RCOM_NAMES_REPLY:
 583                names = 1;
 584                reply = 1;
 585                break;
 586        case DLM_RCOM_LOOKUP:
 587                lookup = 1;
 588                break;
 589        case DLM_RCOM_LOOKUP_REPLY:
 590                lookup = 1;
 591                reply = 1;
 592                break;
 593        case DLM_RCOM_LOCK:
 594                lock = 1;
 595                break;
 596        case DLM_RCOM_LOCK_REPLY:
 597                lock = 1;
 598                reply = 1;
 599                break;
 600        }
 601
 602        spin_lock(&ls->ls_recover_lock);
 603        status = ls->ls_recover_status;
 604        stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 605        seq = ls->ls_recover_seq;
 606        spin_unlock(&ls->ls_recover_lock);
 607
 608        if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 609                goto ignore;
 610
 611        if (reply && (rc->rc_seq_reply != seq))
 612                goto ignore;
 613
 614        if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 615                goto ignore;
 616
 617        if (!(status & DLM_RS_DIR) && (lookup || lock))
 618                goto ignore;
 619
 620        switch (rc->rc_type) {
 621        case DLM_RCOM_STATUS:
 622                receive_rcom_status(ls, rc);
 623                break;
 624
 625        case DLM_RCOM_NAMES:
 626                receive_rcom_names(ls, rc);
 627                break;
 628
 629        case DLM_RCOM_LOOKUP:
 630                receive_rcom_lookup(ls, rc);
 631                break;
 632
 633        case DLM_RCOM_LOCK:
 634                if (rc->rc_header.h_length < lock_size)
 635                        goto Eshort;
 636                receive_rcom_lock(ls, rc);
 637                break;
 638
 639        case DLM_RCOM_STATUS_REPLY:
 640                receive_sync_reply(ls, rc);
 641                break;
 642
 643        case DLM_RCOM_NAMES_REPLY:
 644                receive_sync_reply(ls, rc);
 645                break;
 646
 647        case DLM_RCOM_LOOKUP_REPLY:
 648                receive_rcom_lookup_reply(ls, rc);
 649                break;
 650
 651        case DLM_RCOM_LOCK_REPLY:
 652                if (rc->rc_header.h_length < lock_size)
 653                        goto Eshort;
 654                dlm_recover_process_copy(ls, rc);
 655                break;
 656
 657        default:
 658                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 659        }
 660        return;
 661
 662ignore:
 663        log_limit(ls, "dlm_receive_rcom ignore msg %d "
 664                  "from %d %llu %llu recover seq %llu sts %x gen %u",
 665                   rc->rc_type,
 666                   nodeid,
 667                   (unsigned long long)rc->rc_seq,
 668                   (unsigned long long)rc->rc_seq_reply,
 669                   (unsigned long long)seq,
 670                   status, ls->ls_generation);
 671        return;
 672Eshort:
 673        log_error(ls, "recovery message %d from %d is too short",
 674                  rc->rc_type, nodeid);
 675}
 676
 677