1
2
3
4
5
6
7
8
9
10
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "midcomms.h"
17#include "rcom.h"
18#include "recover.h"
19#include "dir.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "util.h"
24
25static int rcom_response(struct dlm_ls *ls)
26{
27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28}
29
30static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32{
33 struct dlm_rcom *rc;
34 struct dlm_mhandle *mh;
35 char *mb;
36 int mb_len = sizeof(struct dlm_rcom) + len;
37
38 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39 if (!mh) {
40 log_print("create_rcom to %d type %d len %d ENOBUFS",
41 to_nodeid, type, len);
42 return -ENOBUFS;
43 }
44 memset(mb, 0, mb_len);
45
46 rc = (struct dlm_rcom *) mb;
47
48 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
49 rc->rc_header.h_lockspace = ls->ls_global_id;
50 rc->rc_header.h_nodeid = dlm_our_nodeid();
51 rc->rc_header.h_length = mb_len;
52 rc->rc_header.h_cmd = DLM_RCOM;
53
54 rc->rc_type = type;
55
56 spin_lock(&ls->ls_recover_lock);
57 rc->rc_seq = ls->ls_recover_seq;
58 spin_unlock(&ls->ls_recover_lock);
59
60 *mh_ret = mh;
61 *rc_ret = rc;
62 return 0;
63}
64
65static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
66 struct dlm_rcom *rc)
67{
68 dlm_rcom_out(rc);
69 dlm_lowcomms_commit_buffer(mh);
70}
71
72static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
73 uint32_t flags)
74{
75 rs->rs_flags = cpu_to_le32(flags);
76}
77
78
79
80
81
82static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
83 uint32_t num_slots)
84{
85 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
86 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
87
88 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
89 rf->rf_num_slots = cpu_to_le16(num_slots);
90 rf->rf_generation = cpu_to_le32(ls->ls_generation);
91}
92
93static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
94{
95 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
96
97 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
98 log_error(ls, "version mismatch: %x nodeid %d: %x",
99 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
100 rc->rc_header.h_version);
101 return -EPROTO;
102 }
103
104 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
105 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
106 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
107 ls->ls_lvblen, ls->ls_exflags, nodeid,
108 le32_to_cpu(rf->rf_lvblen),
109 le32_to_cpu(rf->rf_lsflags));
110 return -EPROTO;
111 }
112 return 0;
113}
114
115static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
116{
117 spin_lock(&ls->ls_rcom_spin);
118 *new_seq = ++ls->ls_rcom_seq;
119 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
120 spin_unlock(&ls->ls_rcom_spin);
121}
122
123static void disallow_sync_reply(struct dlm_ls *ls)
124{
125 spin_lock(&ls->ls_rcom_spin);
126 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
127 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
128 spin_unlock(&ls->ls_rcom_spin);
129}
130
131
132
133
134
135
136
137
138
139
140
141
142int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
143{
144 struct dlm_rcom *rc;
145 struct dlm_mhandle *mh;
146 int error = 0;
147
148 ls->ls_recover_nodeid = nodeid;
149
150 if (nodeid == dlm_our_nodeid()) {
151 rc = ls->ls_recover_buf;
152 rc->rc_result = dlm_recover_status(ls);
153 goto out;
154 }
155
156retry:
157 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
158 sizeof(struct rcom_status), &rc, &mh);
159 if (error)
160 goto out;
161
162 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
163
164 allow_sync_reply(ls, &rc->rc_id);
165 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
166
167 send_rcom(ls, mh, rc);
168
169 error = dlm_wait_function(ls, &rcom_response);
170 disallow_sync_reply(ls);
171 if (error == -ETIMEDOUT)
172 goto retry;
173 if (error)
174 goto out;
175
176 rc = ls->ls_recover_buf;
177
178 if (rc->rc_result == -ESRCH) {
179
180 log_debug(ls, "remote node %d not ready", nodeid);
181 rc->rc_result = 0;
182 error = 0;
183 } else {
184 error = check_rcom_config(ls, rc, nodeid);
185 }
186
187
188 out:
189 return error;
190}
191
192static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
193{
194 struct dlm_rcom *rc;
195 struct dlm_mhandle *mh;
196 struct rcom_status *rs;
197 uint32_t status;
198 int nodeid = rc_in->rc_header.h_nodeid;
199 int len = sizeof(struct rcom_config);
200 int num_slots = 0;
201 int error;
202
203 if (!dlm_slots_version(&rc_in->rc_header)) {
204 status = dlm_recover_status(ls);
205 goto do_create;
206 }
207
208 rs = (struct rcom_status *)rc_in->rc_buf;
209
210 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
211 status = dlm_recover_status(ls);
212 goto do_create;
213 }
214
215 spin_lock(&ls->ls_recover_lock);
216 status = ls->ls_recover_status;
217 num_slots = ls->ls_num_slots;
218 spin_unlock(&ls->ls_recover_lock);
219 len += num_slots * sizeof(struct rcom_slot);
220
221 do_create:
222 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
223 len, &rc, &mh);
224 if (error)
225 return;
226
227 rc->rc_id = rc_in->rc_id;
228 rc->rc_seq_reply = rc_in->rc_seq;
229 rc->rc_result = status;
230
231 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
232
233 if (!num_slots)
234 goto do_send;
235
236 spin_lock(&ls->ls_recover_lock);
237 if (ls->ls_num_slots != num_slots) {
238 spin_unlock(&ls->ls_recover_lock);
239 log_debug(ls, "receive_rcom_status num_slots %d to %d",
240 num_slots, ls->ls_num_slots);
241 rc->rc_result = 0;
242 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
243 goto do_send;
244 }
245
246 dlm_slots_copy_out(ls, rc);
247 spin_unlock(&ls->ls_recover_lock);
248
249 do_send:
250 send_rcom(ls, mh, rc);
251}
252
253static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
254{
255 spin_lock(&ls->ls_rcom_spin);
256 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
257 rc_in->rc_id != ls->ls_rcom_seq) {
258 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
259 rc_in->rc_type, rc_in->rc_header.h_nodeid,
260 (unsigned long long)rc_in->rc_id,
261 (unsigned long long)ls->ls_rcom_seq);
262 goto out;
263 }
264 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
265 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
266 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
267 wake_up(&ls->ls_wait_general);
268 out:
269 spin_unlock(&ls->ls_rcom_spin);
270}
271
272int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
273{
274 struct dlm_rcom *rc;
275 struct dlm_mhandle *mh;
276 int error = 0;
277
278 ls->ls_recover_nodeid = nodeid;
279
280retry:
281 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
282 if (error)
283 goto out;
284 memcpy(rc->rc_buf, last_name, last_len);
285
286 allow_sync_reply(ls, &rc->rc_id);
287 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
288
289 send_rcom(ls, mh, rc);
290
291 error = dlm_wait_function(ls, &rcom_response);
292 disallow_sync_reply(ls);
293 if (error == -ETIMEDOUT)
294 goto retry;
295 out:
296 return error;
297}
298
299static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
300{
301 struct dlm_rcom *rc;
302 struct dlm_mhandle *mh;
303 int error, inlen, outlen, nodeid;
304
305 nodeid = rc_in->rc_header.h_nodeid;
306 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
307 outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
308
309 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
310 if (error)
311 return;
312 rc->rc_id = rc_in->rc_id;
313 rc->rc_seq_reply = rc_in->rc_seq;
314
315 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
316 nodeid);
317 send_rcom(ls, mh, rc);
318}
319
320int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
321{
322 struct dlm_rcom *rc;
323 struct dlm_mhandle *mh;
324 struct dlm_ls *ls = r->res_ls;
325 int error;
326
327 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
328 &rc, &mh);
329 if (error)
330 goto out;
331 memcpy(rc->rc_buf, r->res_name, r->res_length);
332 rc->rc_id = (unsigned long) r->res_id;
333
334 send_rcom(ls, mh, rc);
335 out:
336 return error;
337}
338
339static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
340{
341 struct dlm_rcom *rc;
342 struct dlm_mhandle *mh;
343 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
344 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
345
346 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
347 if (error)
348 return;
349
350
351 if (rc_in->rc_id == 0xFFFFFFFF) {
352 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
353 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
354 return;
355 }
356
357 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
358 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359 if (error)
360 ret_nodeid = error;
361 rc->rc_result = ret_nodeid;
362 rc->rc_id = rc_in->rc_id;
363 rc->rc_seq_reply = rc_in->rc_seq;
364
365 send_rcom(ls, mh, rc);
366}
367
368static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
369{
370 dlm_recover_master_reply(ls, rc_in);
371}
372
373static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
374 struct rcom_lock *rl)
375{
376 memset(rl, 0, sizeof(*rl));
377
378 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
379 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
380 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
381 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
382 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
383 rl->rl_rqmode = lkb->lkb_rqmode;
384 rl->rl_grmode = lkb->lkb_grmode;
385 rl->rl_status = lkb->lkb_status;
386 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
387
388 if (lkb->lkb_bastfn)
389 rl->rl_asts |= DLM_CB_BAST;
390 if (lkb->lkb_astfn)
391 rl->rl_asts |= DLM_CB_CAST;
392
393 rl->rl_namelen = cpu_to_le16(r->res_length);
394 memcpy(rl->rl_name, r->res_name, r->res_length);
395
396
397
398
399 if (lkb->lkb_lvbptr)
400 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
401}
402
403int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
404{
405 struct dlm_ls *ls = r->res_ls;
406 struct dlm_rcom *rc;
407 struct dlm_mhandle *mh;
408 struct rcom_lock *rl;
409 int error, len = sizeof(struct rcom_lock);
410
411 if (lkb->lkb_lvbptr)
412 len += ls->ls_lvblen;
413
414 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
415 if (error)
416 goto out;
417
418 rl = (struct rcom_lock *) rc->rc_buf;
419 pack_rcom_lock(r, lkb, rl);
420 rc->rc_id = (unsigned long) r;
421
422 send_rcom(ls, mh, rc);
423 out:
424 return error;
425}
426
427
428static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
429{
430 struct dlm_rcom *rc;
431 struct dlm_mhandle *mh;
432 int error, nodeid = rc_in->rc_header.h_nodeid;
433
434 dlm_recover_master_copy(ls, rc_in);
435
436 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
437 sizeof(struct rcom_lock), &rc, &mh);
438 if (error)
439 return;
440
441
442
443
444 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
445 rc->rc_id = rc_in->rc_id;
446 rc->rc_seq_reply = rc_in->rc_seq;
447
448 send_rcom(ls, mh, rc);
449}
450
451
452
453
454int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
455{
456 struct dlm_rcom *rc;
457 struct rcom_config *rf;
458 struct dlm_mhandle *mh;
459 char *mb;
460 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
461
462 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
463 if (!mh)
464 return -ENOBUFS;
465 memset(mb, 0, mb_len);
466
467 rc = (struct dlm_rcom *) mb;
468
469 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
470 rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
471 rc->rc_header.h_nodeid = dlm_our_nodeid();
472 rc->rc_header.h_length = mb_len;
473 rc->rc_header.h_cmd = DLM_RCOM;
474
475 rc->rc_type = DLM_RCOM_STATUS_REPLY;
476 rc->rc_id = rc_in->rc_id;
477 rc->rc_seq_reply = rc_in->rc_seq;
478 rc->rc_result = -ESRCH;
479
480 rf = (struct rcom_config *) rc->rc_buf;
481 rf->rf_lvblen = cpu_to_le32(~0U);
482
483 dlm_rcom_out(rc);
484 dlm_lowcomms_commit_buffer(mh);
485
486 return 0;
487}
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
535{
536 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
537 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
538 uint32_t status;
539 uint64_t seq;
540
541 switch (rc->rc_type) {
542 case DLM_RCOM_STATUS_REPLY:
543 reply = 1;
544 break;
545 case DLM_RCOM_NAMES:
546 names = 1;
547 break;
548 case DLM_RCOM_NAMES_REPLY:
549 names = 1;
550 reply = 1;
551 break;
552 case DLM_RCOM_LOOKUP:
553 lookup = 1;
554 break;
555 case DLM_RCOM_LOOKUP_REPLY:
556 lookup = 1;
557 reply = 1;
558 break;
559 case DLM_RCOM_LOCK:
560 lock = 1;
561 break;
562 case DLM_RCOM_LOCK_REPLY:
563 lock = 1;
564 reply = 1;
565 break;
566 };
567
568 spin_lock(&ls->ls_recover_lock);
569 status = ls->ls_recover_status;
570 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
571 seq = ls->ls_recover_seq;
572 spin_unlock(&ls->ls_recover_lock);
573
574 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
575 goto ignore;
576
577 if (reply && (rc->rc_seq_reply != seq))
578 goto ignore;
579
580 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
581 goto ignore;
582
583 if (!(status & DLM_RS_DIR) && (lookup || lock))
584 goto ignore;
585
586 switch (rc->rc_type) {
587 case DLM_RCOM_STATUS:
588 receive_rcom_status(ls, rc);
589 break;
590
591 case DLM_RCOM_NAMES:
592 receive_rcom_names(ls, rc);
593 break;
594
595 case DLM_RCOM_LOOKUP:
596 receive_rcom_lookup(ls, rc);
597 break;
598
599 case DLM_RCOM_LOCK:
600 if (rc->rc_header.h_length < lock_size)
601 goto Eshort;
602 receive_rcom_lock(ls, rc);
603 break;
604
605 case DLM_RCOM_STATUS_REPLY:
606 receive_sync_reply(ls, rc);
607 break;
608
609 case DLM_RCOM_NAMES_REPLY:
610 receive_sync_reply(ls, rc);
611 break;
612
613 case DLM_RCOM_LOOKUP_REPLY:
614 receive_rcom_lookup_reply(ls, rc);
615 break;
616
617 case DLM_RCOM_LOCK_REPLY:
618 if (rc->rc_header.h_length < lock_size)
619 goto Eshort;
620 dlm_recover_process_copy(ls, rc);
621 break;
622
623 default:
624 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
625 }
626 return;
627
628ignore:
629 log_limit(ls, "dlm_receive_rcom ignore msg %d "
630 "from %d %llu %llu recover seq %llu sts %x gen %u",
631 rc->rc_type,
632 nodeid,
633 (unsigned long long)rc->rc_seq,
634 (unsigned long long)rc->rc_seq_reply,
635 (unsigned long long)seq,
636 status, ls->ls_generation);
637 return;
638Eshort:
639 log_error(ls, "recovery message %d from %d is too short",
640 rc->rc_type, nodeid);
641}
642
643