linux/fs/dlm/rcom.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "lowcomms.h"
  18#include "midcomms.h"
  19#include "rcom.h"
  20#include "recover.h"
  21#include "dir.h"
  22#include "config.h"
  23#include "memory.h"
  24#include "lock.h"
  25#include "util.h"
  26
  27static int rcom_response(struct dlm_ls *ls)
  28{
  29        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  30}
  31
  32static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  33                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  34{
  35        struct dlm_rcom *rc;
  36        struct dlm_mhandle *mh;
  37        char *mb;
  38        int mb_len = sizeof(struct dlm_rcom) + len;
  39
  40        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
  41        if (!mh) {
  42                log_print("create_rcom to %d type %d len %d ENOBUFS",
  43                          to_nodeid, type, len);
  44                return -ENOBUFS;
  45        }
  46        memset(mb, 0, mb_len);
  47
  48        rc = (struct dlm_rcom *) mb;
  49
  50        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  51        rc->rc_header.h_lockspace = ls->ls_global_id;
  52        rc->rc_header.h_nodeid = dlm_our_nodeid();
  53        rc->rc_header.h_length = mb_len;
  54        rc->rc_header.h_cmd = DLM_RCOM;
  55
  56        rc->rc_type = type;
  57
  58        spin_lock(&ls->ls_recover_lock);
  59        rc->rc_seq = ls->ls_recover_seq;
  60        spin_unlock(&ls->ls_recover_lock);
  61
  62        *mh_ret = mh;
  63        *rc_ret = rc;
  64        return 0;
  65}
  66
  67static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  68                      struct dlm_rcom *rc)
  69{
  70        dlm_rcom_out(rc);
  71        dlm_lowcomms_commit_buffer(mh);
  72}
  73
  74static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  75                            uint32_t flags)
  76{
  77        rs->rs_flags = cpu_to_le32(flags);
  78}
  79
  80/* When replying to a status request, a node also sends back its
  81   configuration values.  The requesting node then checks that the remote
  82   node is configured the same way as itself. */
  83
  84static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  85                            uint32_t num_slots)
  86{
  87        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  88        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  89
  90        rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  91        rf->rf_num_slots = cpu_to_le16(num_slots);
  92        rf->rf_generation =  cpu_to_le32(ls->ls_generation);
  93}
  94
  95static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  96{
  97        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  98
  99        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
 100                log_error(ls, "version mismatch: %x nodeid %d: %x",
 101                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
 102                          rc->rc_header.h_version);
 103                return -EPROTO;
 104        }
 105
 106        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 107            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 108                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 109                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 110                          le32_to_cpu(rf->rf_lvblen),
 111                          le32_to_cpu(rf->rf_lsflags));
 112                return -EPROTO;
 113        }
 114        return 0;
 115}
 116
 117static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 118{
 119        spin_lock(&ls->ls_rcom_spin);
 120        *new_seq = ++ls->ls_rcom_seq;
 121        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 122        spin_unlock(&ls->ls_rcom_spin);
 123}
 124
 125static void disallow_sync_reply(struct dlm_ls *ls)
 126{
 127        spin_lock(&ls->ls_rcom_spin);
 128        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 129        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 130        spin_unlock(&ls->ls_rcom_spin);
 131}
 132
 133/*
 134 * low nodeid gathers one slot value at a time from each node.
 135 * it sets need_slots=0, and saves rf_our_slot returned from each
 136 * rcom_config.
 137 *
 138 * other nodes gather all slot values at once from the low nodeid.
 139 * they set need_slots=1, and ignore the rf_our_slot returned from each
 140 * rcom_config.  they use the rf_num_slots returned from the low
 141 * node's rcom_config.
 142 */
 143
 144int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 145{
 146        struct dlm_rcom *rc;
 147        struct dlm_mhandle *mh;
 148        int error = 0;
 149
 150        ls->ls_recover_nodeid = nodeid;
 151
 152        if (nodeid == dlm_our_nodeid()) {
 153                rc = ls->ls_recover_buf;
 154                rc->rc_result = dlm_recover_status(ls);
 155                goto out;
 156        }
 157
 158        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
 159                            sizeof(struct rcom_status), &rc, &mh);
 160        if (error)
 161                goto out;
 162
 163        set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 164
 165        allow_sync_reply(ls, &rc->rc_id);
 166        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 167
 168        send_rcom(ls, mh, rc);
 169
 170        error = dlm_wait_function(ls, &rcom_response);
 171        disallow_sync_reply(ls);
 172        if (error)
 173                goto out;
 174
 175        rc = ls->ls_recover_buf;
 176
 177        if (rc->rc_result == -ESRCH) {
 178                /* we pretend the remote lockspace exists with 0 status */
 179                log_debug(ls, "remote node %d not ready", nodeid);
 180                rc->rc_result = 0;
 181                error = 0;
 182        } else {
 183                error = check_rcom_config(ls, rc, nodeid);
 184        }
 185
 186        /* the caller looks at rc_result for the remote recovery status */
 187 out:
 188        return error;
 189}
 190
 191static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 192{
 193        struct dlm_rcom *rc;
 194        struct dlm_mhandle *mh;
 195        struct rcom_status *rs;
 196        uint32_t status;
 197        int nodeid = rc_in->rc_header.h_nodeid;
 198        int len = sizeof(struct rcom_config);
 199        int num_slots = 0;
 200        int error;
 201
 202        if (!dlm_slots_version(&rc_in->rc_header)) {
 203                status = dlm_recover_status(ls);
 204                goto do_create;
 205        }
 206
 207        rs = (struct rcom_status *)rc_in->rc_buf;
 208
 209        if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 210                status = dlm_recover_status(ls);
 211                goto do_create;
 212        }
 213
 214        spin_lock(&ls->ls_recover_lock);
 215        status = ls->ls_recover_status;
 216        num_slots = ls->ls_num_slots;
 217        spin_unlock(&ls->ls_recover_lock);
 218        len += num_slots * sizeof(struct rcom_slot);
 219
 220 do_create:
 221        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 222                            len, &rc, &mh);
 223        if (error)
 224                return;
 225
 226        rc->rc_id = rc_in->rc_id;
 227        rc->rc_seq_reply = rc_in->rc_seq;
 228        rc->rc_result = status;
 229
 230        set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 231
 232        if (!num_slots)
 233                goto do_send;
 234
 235        spin_lock(&ls->ls_recover_lock);
 236        if (ls->ls_num_slots != num_slots) {
 237                spin_unlock(&ls->ls_recover_lock);
 238                log_debug(ls, "receive_rcom_status num_slots %d to %d",
 239                          num_slots, ls->ls_num_slots);
 240                rc->rc_result = 0;
 241                set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 242                goto do_send;
 243        }
 244
 245        dlm_slots_copy_out(ls, rc);
 246        spin_unlock(&ls->ls_recover_lock);
 247
 248 do_send:
 249        send_rcom(ls, mh, rc);
 250}
 251
 252static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 253{
 254        spin_lock(&ls->ls_rcom_spin);
 255        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 256            rc_in->rc_id != ls->ls_rcom_seq) {
 257                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 258                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 259                          (unsigned long long)rc_in->rc_id,
 260                          (unsigned long long)ls->ls_rcom_seq);
 261                goto out;
 262        }
 263        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 264        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 265        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 266        wake_up(&ls->ls_wait_general);
 267 out:
 268        spin_unlock(&ls->ls_rcom_spin);
 269}
 270
 271int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 272{
 273        struct dlm_rcom *rc;
 274        struct dlm_mhandle *mh;
 275        int error = 0;
 276
 277        ls->ls_recover_nodeid = nodeid;
 278
 279        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 280        if (error)
 281                goto out;
 282        memcpy(rc->rc_buf, last_name, last_len);
 283
 284        allow_sync_reply(ls, &rc->rc_id);
 285        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 286
 287        send_rcom(ls, mh, rc);
 288
 289        error = dlm_wait_function(ls, &rcom_response);
 290        disallow_sync_reply(ls);
 291 out:
 292        return error;
 293}
 294
 295static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 296{
 297        struct dlm_rcom *rc;
 298        struct dlm_mhandle *mh;
 299        int error, inlen, outlen, nodeid;
 300
 301        nodeid = rc_in->rc_header.h_nodeid;
 302        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 303        outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 304
 305        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
 306        if (error)
 307                return;
 308        rc->rc_id = rc_in->rc_id;
 309        rc->rc_seq_reply = rc_in->rc_seq;
 310
 311        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 312                              nodeid);
 313        send_rcom(ls, mh, rc);
 314}
 315
 316int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 317{
 318        struct dlm_rcom *rc;
 319        struct dlm_mhandle *mh;
 320        struct dlm_ls *ls = r->res_ls;
 321        int error;
 322
 323        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 324                            &rc, &mh);
 325        if (error)
 326                goto out;
 327        memcpy(rc->rc_buf, r->res_name, r->res_length);
 328        rc->rc_id = (unsigned long) r->res_id;
 329
 330        send_rcom(ls, mh, rc);
 331 out:
 332        return error;
 333}
 334
 335int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
 336{
 337        struct dlm_rcom *rc;
 338        struct dlm_mhandle *mh;
 339        struct dlm_ls *ls = r->res_ls;
 340        int error;
 341
 342        error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 343                            &rc, &mh);
 344        if (error)
 345                goto out;
 346        memcpy(rc->rc_buf, r->res_name, r->res_length);
 347        rc->rc_id = 0xFFFFFFFF;
 348
 349        send_rcom(ls, mh, rc);
 350 out:
 351        return error;
 352}
 353
 354static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 355{
 356        struct dlm_rcom *rc;
 357        struct dlm_mhandle *mh;
 358        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 359        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 360
 361        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 362        if (error)
 363                return;
 364
 365        if (rc_in->rc_id == 0xFFFFFFFF) {
 366                log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 367                dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 368                return;
 369        }
 370
 371        error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 372                                  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 373        if (error)
 374                ret_nodeid = error;
 375        rc->rc_result = ret_nodeid;
 376        rc->rc_id = rc_in->rc_id;
 377        rc->rc_seq_reply = rc_in->rc_seq;
 378
 379        send_rcom(ls, mh, rc);
 380}
 381
 382static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 383{
 384        dlm_recover_master_reply(ls, rc_in);
 385}
 386
 387static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 388                           struct rcom_lock *rl)
 389{
 390        memset(rl, 0, sizeof(*rl));
 391
 392        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 393        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 394        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 395        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 396        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 397        rl->rl_rqmode = lkb->lkb_rqmode;
 398        rl->rl_grmode = lkb->lkb_grmode;
 399        rl->rl_status = lkb->lkb_status;
 400        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 401
 402        if (lkb->lkb_bastfn)
 403                rl->rl_asts |= DLM_CB_BAST;
 404        if (lkb->lkb_astfn)
 405                rl->rl_asts |= DLM_CB_CAST;
 406
 407        rl->rl_namelen = cpu_to_le16(r->res_length);
 408        memcpy(rl->rl_name, r->res_name, r->res_length);
 409
 410        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 411           If so, receive_rcom_lock_args() won't take this copy. */
 412
 413        if (lkb->lkb_lvbptr)
 414                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 415}
 416
 417int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 418{
 419        struct dlm_ls *ls = r->res_ls;
 420        struct dlm_rcom *rc;
 421        struct dlm_mhandle *mh;
 422        struct rcom_lock *rl;
 423        int error, len = sizeof(struct rcom_lock);
 424
 425        if (lkb->lkb_lvbptr)
 426                len += ls->ls_lvblen;
 427
 428        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 429        if (error)
 430                goto out;
 431
 432        rl = (struct rcom_lock *) rc->rc_buf;
 433        pack_rcom_lock(r, lkb, rl);
 434        rc->rc_id = (unsigned long) r;
 435
 436        send_rcom(ls, mh, rc);
 437 out:
 438        return error;
 439}
 440
 441/* needs at least dlm_rcom + rcom_lock */
 442static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 443{
 444        struct dlm_rcom *rc;
 445        struct dlm_mhandle *mh;
 446        int error, nodeid = rc_in->rc_header.h_nodeid;
 447
 448        dlm_recover_master_copy(ls, rc_in);
 449
 450        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 451                            sizeof(struct rcom_lock), &rc, &mh);
 452        if (error)
 453                return;
 454
 455        /* We send back the same rcom_lock struct we received, but
 456           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 457
 458        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 459        rc->rc_id = rc_in->rc_id;
 460        rc->rc_seq_reply = rc_in->rc_seq;
 461
 462        send_rcom(ls, mh, rc);
 463}
 464
 465/* If the lockspace doesn't exist then still send a status message
 466   back; it's possible that it just doesn't have its global_id yet. */
 467
 468int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 469{
 470        struct dlm_rcom *rc;
 471        struct rcom_config *rf;
 472        struct dlm_mhandle *mh;
 473        char *mb;
 474        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 475
 476        mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
 477        if (!mh)
 478                return -ENOBUFS;
 479        memset(mb, 0, mb_len);
 480
 481        rc = (struct dlm_rcom *) mb;
 482
 483        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 484        rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
 485        rc->rc_header.h_nodeid = dlm_our_nodeid();
 486        rc->rc_header.h_length = mb_len;
 487        rc->rc_header.h_cmd = DLM_RCOM;
 488
 489        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 490        rc->rc_id = rc_in->rc_id;
 491        rc->rc_seq_reply = rc_in->rc_seq;
 492        rc->rc_result = -ESRCH;
 493
 494        rf = (struct rcom_config *) rc->rc_buf;
 495        rf->rf_lvblen = cpu_to_le32(~0U);
 496
 497        dlm_rcom_out(rc);
 498        dlm_lowcomms_commit_buffer(mh);
 499
 500        return 0;
 501}
 502
 503/*
 504 * Ignore messages for stage Y before we set
 505 * recover_status bit for stage X:
 506 *
 507 * recover_status = 0
 508 *
 509 * dlm_recover_members()
 510 * - send nothing
 511 * - recv nothing
 512 * - ignore NAMES, NAMES_REPLY
 513 * - ignore LOOKUP, LOOKUP_REPLY
 514 * - ignore LOCK, LOCK_REPLY
 515 *
 516 * recover_status |= NODES
 517 *
 518 * dlm_recover_members_wait()
 519 *
 520 * dlm_recover_directory()
 521 * - send NAMES
 522 * - recv NAMES_REPLY
 523 * - ignore LOOKUP, LOOKUP_REPLY
 524 * - ignore LOCK, LOCK_REPLY
 525 *
 526 * recover_status |= DIR
 527 *
 528 * dlm_recover_directory_wait()
 529 *
 530 * dlm_recover_masters()
 531 * - send LOOKUP
 532 * - recv LOOKUP_REPLY
 533 *
 534 * dlm_recover_locks()
 535 * - send LOCKS
 536 * - recv LOCKS_REPLY
 537 *
 538 * recover_status |= LOCKS
 539 *
 540 * dlm_recover_locks_wait()
 541 *
 542 * recover_status |= DONE
 543 */
 544
 545/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 546   recovery-only comms are sent through here. */
 547
 548void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 549{
 550        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 551        int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 552        uint32_t status;
 553        uint64_t seq;
 554
 555        switch (rc->rc_type) {
 556        case DLM_RCOM_STATUS_REPLY:
 557                reply = 1;
 558                break;
 559        case DLM_RCOM_NAMES:
 560                names = 1;
 561                break;
 562        case DLM_RCOM_NAMES_REPLY:
 563                names = 1;
 564                reply = 1;
 565                break;
 566        case DLM_RCOM_LOOKUP:
 567                lookup = 1;
 568                break;
 569        case DLM_RCOM_LOOKUP_REPLY:
 570                lookup = 1;
 571                reply = 1;
 572                break;
 573        case DLM_RCOM_LOCK:
 574                lock = 1;
 575                break;
 576        case DLM_RCOM_LOCK_REPLY:
 577                lock = 1;
 578                reply = 1;
 579                break;
 580        };
 581
 582        spin_lock(&ls->ls_recover_lock);
 583        status = ls->ls_recover_status;
 584        stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 585        seq = ls->ls_recover_seq;
 586        spin_unlock(&ls->ls_recover_lock);
 587
 588        if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 589                goto ignore;
 590
 591        if (reply && (rc->rc_seq_reply != seq))
 592                goto ignore;
 593
 594        if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 595                goto ignore;
 596
 597        if (!(status & DLM_RS_DIR) && (lookup || lock))
 598                goto ignore;
 599
 600        switch (rc->rc_type) {
 601        case DLM_RCOM_STATUS:
 602                receive_rcom_status(ls, rc);
 603                break;
 604
 605        case DLM_RCOM_NAMES:
 606                receive_rcom_names(ls, rc);
 607                break;
 608
 609        case DLM_RCOM_LOOKUP:
 610                receive_rcom_lookup(ls, rc);
 611                break;
 612
 613        case DLM_RCOM_LOCK:
 614                if (rc->rc_header.h_length < lock_size)
 615                        goto Eshort;
 616                receive_rcom_lock(ls, rc);
 617                break;
 618
 619        case DLM_RCOM_STATUS_REPLY:
 620                receive_sync_reply(ls, rc);
 621                break;
 622
 623        case DLM_RCOM_NAMES_REPLY:
 624                receive_sync_reply(ls, rc);
 625                break;
 626
 627        case DLM_RCOM_LOOKUP_REPLY:
 628                receive_rcom_lookup_reply(ls, rc);
 629                break;
 630
 631        case DLM_RCOM_LOCK_REPLY:
 632                if (rc->rc_header.h_length < lock_size)
 633                        goto Eshort;
 634                dlm_recover_process_copy(ls, rc);
 635                break;
 636
 637        default:
 638                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 639        }
 640        return;
 641
 642ignore:
 643        log_limit(ls, "dlm_receive_rcom ignore msg %d "
 644                  "from %d %llu %llu recover seq %llu sts %x gen %u",
 645                   rc->rc_type,
 646                   nodeid,
 647                   (unsigned long long)rc->rc_seq,
 648                   (unsigned long long)rc->rc_seq_reply,
 649                   (unsigned long long)seq,
 650                   status, ls->ls_generation);
 651        return;
 652Eshort:
 653        log_error(ls, "recovery message %d from %d is too short",
 654                  rc->rc_type, nodeid);
 655}
 656
 657