linux/fs/dlm/rcom.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14#include "dlm_internal.h"
  15#include "lockspace.h"
  16#include "member.h"
  17#include "lowcomms.h"
  18#include "midcomms.h"
  19#include "rcom.h"
  20#include "recover.h"
  21#include "dir.h"
  22#include "config.h"
  23#include "memory.h"
  24#include "lock.h"
  25#include "util.h"
  26
  27
  28static int rcom_response(struct dlm_ls *ls)
  29{
  30        return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  31}
  32
  33static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  34                       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  35{
  36        struct dlm_rcom *rc;
  37        struct dlm_mhandle *mh;
  38        char *mb;
  39        int mb_len = sizeof(struct dlm_rcom) + len;
  40
  41        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
  42        if (!mh) {
  43                log_print("create_rcom to %d type %d len %d ENOBUFS",
  44                          to_nodeid, type, len);
  45                return -ENOBUFS;
  46        }
  47        memset(mb, 0, mb_len);
  48
  49        rc = (struct dlm_rcom *) mb;
  50
  51        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  52        rc->rc_header.h_lockspace = ls->ls_global_id;
  53        rc->rc_header.h_nodeid = dlm_our_nodeid();
  54        rc->rc_header.h_length = mb_len;
  55        rc->rc_header.h_cmd = DLM_RCOM;
  56
  57        rc->rc_type = type;
  58
  59        spin_lock(&ls->ls_recover_lock);
  60        rc->rc_seq = ls->ls_recover_seq;
  61        spin_unlock(&ls->ls_recover_lock);
  62
  63        *mh_ret = mh;
  64        *rc_ret = rc;
  65        return 0;
  66}
  67
  68static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  69                      struct dlm_rcom *rc)
  70{
  71        dlm_rcom_out(rc);
  72        dlm_lowcomms_commit_buffer(mh);
  73}
  74
  75/* When replying to a status request, a node also sends back its
  76   configuration values.  The requesting node then checks that the remote
  77   node is configured the same way as itself. */
  78
  79static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
  80{
  81        rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  82        rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  83}
  84
  85static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  86{
  87        struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  88        size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
  89
  90        if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  91                log_error(ls, "version mismatch: %x nodeid %d: %x",
  92                          DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
  93                          rc->rc_header.h_version);
  94                return -EPROTO;
  95        }
  96
  97        if (rc->rc_header.h_length < conf_size) {
  98                log_error(ls, "config too short: %d nodeid %d",
  99                          rc->rc_header.h_length, nodeid);
 100                return -EPROTO;
 101        }
 102
 103        if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 104            le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 105                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 106                          ls->ls_lvblen, ls->ls_exflags, nodeid,
 107                          le32_to_cpu(rf->rf_lvblen),
 108                          le32_to_cpu(rf->rf_lsflags));
 109                return -EPROTO;
 110        }
 111        return 0;
 112}
 113
 114static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 115{
 116        spin_lock(&ls->ls_rcom_spin);
 117        *new_seq = ++ls->ls_rcom_seq;
 118        set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 119        spin_unlock(&ls->ls_rcom_spin);
 120}
 121
 122static void disallow_sync_reply(struct dlm_ls *ls)
 123{
 124        spin_lock(&ls->ls_rcom_spin);
 125        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 126        clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 127        spin_unlock(&ls->ls_rcom_spin);
 128}
 129
 130int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
 131{
 132        struct dlm_rcom *rc;
 133        struct dlm_mhandle *mh;
 134        int error = 0;
 135
 136        ls->ls_recover_nodeid = nodeid;
 137
 138        if (nodeid == dlm_our_nodeid()) {
 139                rc = ls->ls_recover_buf;
 140                rc->rc_result = dlm_recover_status(ls);
 141                goto out;
 142        }
 143
 144        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
 145        if (error)
 146                goto out;
 147
 148        allow_sync_reply(ls, &rc->rc_id);
 149        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 150
 151        send_rcom(ls, mh, rc);
 152
 153        error = dlm_wait_function(ls, &rcom_response);
 154        disallow_sync_reply(ls);
 155        if (error)
 156                goto out;
 157
 158        rc = ls->ls_recover_buf;
 159
 160        if (rc->rc_result == -ESRCH) {
 161                /* we pretend the remote lockspace exists with 0 status */
 162                log_debug(ls, "remote node %d not ready", nodeid);
 163                rc->rc_result = 0;
 164        } else
 165                error = check_config(ls, rc, nodeid);
 166        /* the caller looks at rc_result for the remote recovery status */
 167 out:
 168        return error;
 169}
 170
 171static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 172{
 173        struct dlm_rcom *rc;
 174        struct dlm_mhandle *mh;
 175        int error, nodeid = rc_in->rc_header.h_nodeid;
 176
 177        error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 178                            sizeof(struct rcom_config), &rc, &mh);
 179        if (error)
 180                return;
 181        rc->rc_id = rc_in->rc_id;
 182        rc->rc_seq_reply = rc_in->rc_seq;
 183        rc->rc_result = dlm_recover_status(ls);
 184        make_config(ls, (struct rcom_config *) rc->rc_buf);
 185
 186        send_rcom(ls, mh, rc);
 187}
 188
 189static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 190{
 191        spin_lock(&ls->ls_rcom_spin);
 192        if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 193            rc_in->rc_id != ls->ls_rcom_seq) {
 194                log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 195                          rc_in->rc_type, rc_in->rc_header.h_nodeid,
 196                          (unsigned long long)rc_in->rc_id,
 197                          (unsigned long long)ls->ls_rcom_seq);
 198                goto out;
 199        }
 200        memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 201        set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 202        clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 203        wake_up(&ls->ls_wait_general);
 204 out:
 205        spin_unlock(&ls->ls_rcom_spin);
 206}
 207
 208int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 209{
 210        struct dlm_rcom *rc;
 211        struct dlm_mhandle *mh;
 212        int error = 0;
 213        int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 214
 215        ls->ls_recover_nodeid = nodeid;
 216
 217        if (nodeid == dlm_our_nodeid()) {
 218                ls->ls_recover_buf->rc_header.h_length =
 219                        dlm_config.ci_buffer_size;
 220                dlm_copy_master_names(ls, last_name, last_len,
 221                                      ls->ls_recover_buf->rc_buf,
 222                                      max_size, nodeid);
 223                goto out;
 224        }
 225
 226        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 227        if (error)
 228                goto out;
 229        memcpy(rc->rc_buf, last_name, last_len);
 230
 231        allow_sync_reply(ls, &rc->rc_id);
 232        memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 233
 234        send_rcom(ls, mh, rc);
 235
 236        error = dlm_wait_function(ls, &rcom_response);
 237        disallow_sync_reply(ls);
 238 out:
 239        return error;
 240}
 241
 242static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 243{
 244        struct dlm_rcom *rc;
 245        struct dlm_mhandle *mh;
 246        int error, inlen, outlen, nodeid;
 247
 248        nodeid = rc_in->rc_header.h_nodeid;
 249        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 250        outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 251
 252        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
 253        if (error)
 254                return;
 255        rc->rc_id = rc_in->rc_id;
 256        rc->rc_seq_reply = rc_in->rc_seq;
 257
 258        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 259                              nodeid);
 260        send_rcom(ls, mh, rc);
 261}
 262
 263int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 264{
 265        struct dlm_rcom *rc;
 266        struct dlm_mhandle *mh;
 267        struct dlm_ls *ls = r->res_ls;
 268        int error;
 269
 270        error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 271                            &rc, &mh);
 272        if (error)
 273                goto out;
 274        memcpy(rc->rc_buf, r->res_name, r->res_length);
 275        rc->rc_id = (unsigned long) r;
 276
 277        send_rcom(ls, mh, rc);
 278 out:
 279        return error;
 280}
 281
 282static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 283{
 284        struct dlm_rcom *rc;
 285        struct dlm_mhandle *mh;
 286        int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 287        int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 288
 289        error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 290        if (error)
 291                return;
 292
 293        error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
 294        if (error)
 295                ret_nodeid = error;
 296        rc->rc_result = ret_nodeid;
 297        rc->rc_id = rc_in->rc_id;
 298        rc->rc_seq_reply = rc_in->rc_seq;
 299
 300        send_rcom(ls, mh, rc);
 301}
 302
 303static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 304{
 305        dlm_recover_master_reply(ls, rc_in);
 306}
 307
 308static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 309                           struct rcom_lock *rl)
 310{
 311        memset(rl, 0, sizeof(*rl));
 312
 313        rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 314        rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 315        rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 316        rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 317        rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 318        rl->rl_rqmode = lkb->lkb_rqmode;
 319        rl->rl_grmode = lkb->lkb_grmode;
 320        rl->rl_status = lkb->lkb_status;
 321        rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 322
 323        if (lkb->lkb_bastfn)
 324                rl->rl_asts |= AST_BAST;
 325        if (lkb->lkb_astfn)
 326                rl->rl_asts |= AST_COMP;
 327
 328        rl->rl_namelen = cpu_to_le16(r->res_length);
 329        memcpy(rl->rl_name, r->res_name, r->res_length);
 330
 331        /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 332           If so, receive_rcom_lock_args() won't take this copy. */
 333
 334        if (lkb->lkb_lvbptr)
 335                memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 336}
 337
 338int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 339{
 340        struct dlm_ls *ls = r->res_ls;
 341        struct dlm_rcom *rc;
 342        struct dlm_mhandle *mh;
 343        struct rcom_lock *rl;
 344        int error, len = sizeof(struct rcom_lock);
 345
 346        if (lkb->lkb_lvbptr)
 347                len += ls->ls_lvblen;
 348
 349        error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 350        if (error)
 351                goto out;
 352
 353        rl = (struct rcom_lock *) rc->rc_buf;
 354        pack_rcom_lock(r, lkb, rl);
 355        rc->rc_id = (unsigned long) r;
 356
 357        send_rcom(ls, mh, rc);
 358 out:
 359        return error;
 360}
 361
 362/* needs at least dlm_rcom + rcom_lock */
 363static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 364{
 365        struct dlm_rcom *rc;
 366        struct dlm_mhandle *mh;
 367        int error, nodeid = rc_in->rc_header.h_nodeid;
 368
 369        dlm_recover_master_copy(ls, rc_in);
 370
 371        error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 372                            sizeof(struct rcom_lock), &rc, &mh);
 373        if (error)
 374                return;
 375
 376        /* We send back the same rcom_lock struct we received, but
 377           dlm_recover_master_copy() has filled in rl_remid and rl_result */
 378
 379        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 380        rc->rc_id = rc_in->rc_id;
 381        rc->rc_seq_reply = rc_in->rc_seq;
 382
 383        send_rcom(ls, mh, rc);
 384}
 385
 386/* If the lockspace doesn't exist then still send a status message
 387   back; it's possible that it just doesn't have its global_id yet. */
 388
 389int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 390{
 391        struct dlm_rcom *rc;
 392        struct rcom_config *rf;
 393        struct dlm_mhandle *mh;
 394        char *mb;
 395        int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 396
 397        mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
 398        if (!mh)
 399                return -ENOBUFS;
 400        memset(mb, 0, mb_len);
 401
 402        rc = (struct dlm_rcom *) mb;
 403
 404        rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 405        rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
 406        rc->rc_header.h_nodeid = dlm_our_nodeid();
 407        rc->rc_header.h_length = mb_len;
 408        rc->rc_header.h_cmd = DLM_RCOM;
 409
 410        rc->rc_type = DLM_RCOM_STATUS_REPLY;
 411        rc->rc_id = rc_in->rc_id;
 412        rc->rc_seq_reply = rc_in->rc_seq;
 413        rc->rc_result = -ESRCH;
 414
 415        rf = (struct rcom_config *) rc->rc_buf;
 416        rf->rf_lvblen = cpu_to_le32(~0U);
 417
 418        dlm_rcom_out(rc);
 419        dlm_lowcomms_commit_buffer(mh);
 420
 421        return 0;
 422}
 423
 424static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 425{
 426        uint64_t seq;
 427        int rv = 0;
 428
 429        switch (rc->rc_type) {
 430        case DLM_RCOM_STATUS_REPLY:
 431        case DLM_RCOM_NAMES_REPLY:
 432        case DLM_RCOM_LOOKUP_REPLY:
 433        case DLM_RCOM_LOCK_REPLY:
 434                spin_lock(&ls->ls_recover_lock);
 435                seq = ls->ls_recover_seq;
 436                spin_unlock(&ls->ls_recover_lock);
 437                if (rc->rc_seq_reply != seq) {
 438                        log_debug(ls, "ignoring old reply %x from %d "
 439                                      "seq_reply %llx expect %llx",
 440                                      rc->rc_type, rc->rc_header.h_nodeid,
 441                                      (unsigned long long)rc->rc_seq_reply,
 442                                      (unsigned long long)seq);
 443                        rv = 1;
 444                }
 445        }
 446        return rv;
 447}
 448
 449/* Called by dlm_recv; corresponds to dlm_receive_message() but special
 450   recovery-only comms are sent through here. */
 451
 452void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 453{
 454        int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 455
 456        if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
 457                log_debug(ls, "ignoring recovery message %x from %d",
 458                          rc->rc_type, nodeid);
 459                goto out;
 460        }
 461
 462        if (is_old_reply(ls, rc))
 463                goto out;
 464
 465        switch (rc->rc_type) {
 466        case DLM_RCOM_STATUS:
 467                receive_rcom_status(ls, rc);
 468                break;
 469
 470        case DLM_RCOM_NAMES:
 471                receive_rcom_names(ls, rc);
 472                break;
 473
 474        case DLM_RCOM_LOOKUP:
 475                receive_rcom_lookup(ls, rc);
 476                break;
 477
 478        case DLM_RCOM_LOCK:
 479                if (rc->rc_header.h_length < lock_size)
 480                        goto Eshort;
 481                receive_rcom_lock(ls, rc);
 482                break;
 483
 484        case DLM_RCOM_STATUS_REPLY:
 485                receive_sync_reply(ls, rc);
 486                break;
 487
 488        case DLM_RCOM_NAMES_REPLY:
 489                receive_sync_reply(ls, rc);
 490                break;
 491
 492        case DLM_RCOM_LOOKUP_REPLY:
 493                receive_rcom_lookup_reply(ls, rc);
 494                break;
 495
 496        case DLM_RCOM_LOCK_REPLY:
 497                if (rc->rc_header.h_length < lock_size)
 498                        goto Eshort;
 499                dlm_recover_process_copy(ls, rc);
 500                break;
 501
 502        default:
 503                log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 504        }
 505out:
 506        return;
 507Eshort:
 508        log_error(ls, "recovery message %x from %d is too short",
 509                          rc->rc_type, nodeid);
 510}
 511
 512