1
2
3
4
5
6
7
8
9
10
11
12
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "midcomms.h"
19#include "rcom.h"
20#include "recover.h"
21#include "dir.h"
22#include "config.h"
23#include "memory.h"
24#include "lock.h"
25#include "util.h"
26
27static int rcom_response(struct dlm_ls *ls)
28{
29 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
30}
31
32static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
33 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
34{
35 struct dlm_rcom *rc;
36 struct dlm_mhandle *mh;
37 char *mb;
38 int mb_len = sizeof(struct dlm_rcom) + len;
39
40 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
41 if (!mh) {
42 log_print("create_rcom to %d type %d len %d ENOBUFS",
43 to_nodeid, type, len);
44 return -ENOBUFS;
45 }
46 memset(mb, 0, mb_len);
47
48 rc = (struct dlm_rcom *) mb;
49
50 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
51 rc->rc_header.h_lockspace = ls->ls_global_id;
52 rc->rc_header.h_nodeid = dlm_our_nodeid();
53 rc->rc_header.h_length = mb_len;
54 rc->rc_header.h_cmd = DLM_RCOM;
55
56 rc->rc_type = type;
57
58 spin_lock(&ls->ls_recover_lock);
59 rc->rc_seq = ls->ls_recover_seq;
60 spin_unlock(&ls->ls_recover_lock);
61
62 *mh_ret = mh;
63 *rc_ret = rc;
64 return 0;
65}
66
67static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
68 struct dlm_rcom *rc)
69{
70 dlm_rcom_out(rc);
71 dlm_lowcomms_commit_buffer(mh);
72}
73
74static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
75 uint32_t flags)
76{
77 rs->rs_flags = cpu_to_le32(flags);
78}
79
80
81
82
83
84static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
85 uint32_t num_slots)
86{
87 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
88 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
89
90 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
91 rf->rf_num_slots = cpu_to_le16(num_slots);
92 rf->rf_generation = cpu_to_le32(ls->ls_generation);
93}
94
95static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
96{
97 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
98
99 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
100 log_error(ls, "version mismatch: %x nodeid %d: %x",
101 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
102 rc->rc_header.h_version);
103 return -EPROTO;
104 }
105
106 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
107 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
108 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
109 ls->ls_lvblen, ls->ls_exflags, nodeid,
110 le32_to_cpu(rf->rf_lvblen),
111 le32_to_cpu(rf->rf_lsflags));
112 return -EPROTO;
113 }
114 return 0;
115}
116
117static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
118{
119 spin_lock(&ls->ls_rcom_spin);
120 *new_seq = ++ls->ls_rcom_seq;
121 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
122 spin_unlock(&ls->ls_rcom_spin);
123}
124
125static void disallow_sync_reply(struct dlm_ls *ls)
126{
127 spin_lock(&ls->ls_rcom_spin);
128 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
129 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
130 spin_unlock(&ls->ls_rcom_spin);
131}
132
133
134
135
136
137
138
139
140
141
142
143
144int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
145{
146 struct dlm_rcom *rc;
147 struct dlm_mhandle *mh;
148 int error = 0;
149
150 ls->ls_recover_nodeid = nodeid;
151
152 if (nodeid == dlm_our_nodeid()) {
153 rc = ls->ls_recover_buf;
154 rc->rc_result = dlm_recover_status(ls);
155 goto out;
156 }
157
158retry:
159 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
160 sizeof(struct rcom_status), &rc, &mh);
161 if (error)
162 goto out;
163
164 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
165
166 allow_sync_reply(ls, &rc->rc_id);
167 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
168
169 send_rcom(ls, mh, rc);
170
171 error = dlm_wait_function(ls, &rcom_response);
172 disallow_sync_reply(ls);
173 if (error == -ETIMEDOUT)
174 goto retry;
175 if (error)
176 goto out;
177
178 rc = ls->ls_recover_buf;
179
180 if (rc->rc_result == -ESRCH) {
181
182 log_debug(ls, "remote node %d not ready", nodeid);
183 rc->rc_result = 0;
184 error = 0;
185 } else {
186 error = check_rcom_config(ls, rc, nodeid);
187 }
188
189
190 out:
191 return error;
192}
193
194static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
195{
196 struct dlm_rcom *rc;
197 struct dlm_mhandle *mh;
198 struct rcom_status *rs;
199 uint32_t status;
200 int nodeid = rc_in->rc_header.h_nodeid;
201 int len = sizeof(struct rcom_config);
202 int num_slots = 0;
203 int error;
204
205 if (!dlm_slots_version(&rc_in->rc_header)) {
206 status = dlm_recover_status(ls);
207 goto do_create;
208 }
209
210 rs = (struct rcom_status *)rc_in->rc_buf;
211
212 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
213 status = dlm_recover_status(ls);
214 goto do_create;
215 }
216
217 spin_lock(&ls->ls_recover_lock);
218 status = ls->ls_recover_status;
219 num_slots = ls->ls_num_slots;
220 spin_unlock(&ls->ls_recover_lock);
221 len += num_slots * sizeof(struct rcom_slot);
222
223 do_create:
224 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
225 len, &rc, &mh);
226 if (error)
227 return;
228
229 rc->rc_id = rc_in->rc_id;
230 rc->rc_seq_reply = rc_in->rc_seq;
231 rc->rc_result = status;
232
233 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
234
235 if (!num_slots)
236 goto do_send;
237
238 spin_lock(&ls->ls_recover_lock);
239 if (ls->ls_num_slots != num_slots) {
240 spin_unlock(&ls->ls_recover_lock);
241 log_debug(ls, "receive_rcom_status num_slots %d to %d",
242 num_slots, ls->ls_num_slots);
243 rc->rc_result = 0;
244 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
245 goto do_send;
246 }
247
248 dlm_slots_copy_out(ls, rc);
249 spin_unlock(&ls->ls_recover_lock);
250
251 do_send:
252 send_rcom(ls, mh, rc);
253}
254
255static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
256{
257 spin_lock(&ls->ls_rcom_spin);
258 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
259 rc_in->rc_id != ls->ls_rcom_seq) {
260 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
261 rc_in->rc_type, rc_in->rc_header.h_nodeid,
262 (unsigned long long)rc_in->rc_id,
263 (unsigned long long)ls->ls_rcom_seq);
264 goto out;
265 }
266 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
267 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
268 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
269 wake_up(&ls->ls_wait_general);
270 out:
271 spin_unlock(&ls->ls_rcom_spin);
272}
273
274int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
275{
276 struct dlm_rcom *rc;
277 struct dlm_mhandle *mh;
278 int error = 0;
279
280 ls->ls_recover_nodeid = nodeid;
281
282retry:
283 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
284 if (error)
285 goto out;
286 memcpy(rc->rc_buf, last_name, last_len);
287
288 allow_sync_reply(ls, &rc->rc_id);
289 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
290
291 send_rcom(ls, mh, rc);
292
293 error = dlm_wait_function(ls, &rcom_response);
294 disallow_sync_reply(ls);
295 if (error == -ETIMEDOUT)
296 goto retry;
297 out:
298 return error;
299}
300
301static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
302{
303 struct dlm_rcom *rc;
304 struct dlm_mhandle *mh;
305 int error, inlen, outlen, nodeid;
306
307 nodeid = rc_in->rc_header.h_nodeid;
308 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
309 outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
310
311 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
312 if (error)
313 return;
314 rc->rc_id = rc_in->rc_id;
315 rc->rc_seq_reply = rc_in->rc_seq;
316
317 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
318 nodeid);
319 send_rcom(ls, mh, rc);
320}
321
322int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
323{
324 struct dlm_rcom *rc;
325 struct dlm_mhandle *mh;
326 struct dlm_ls *ls = r->res_ls;
327 int error;
328
329 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
330 &rc, &mh);
331 if (error)
332 goto out;
333 memcpy(rc->rc_buf, r->res_name, r->res_length);
334 rc->rc_id = (unsigned long) r->res_id;
335
336 send_rcom(ls, mh, rc);
337 out:
338 return error;
339}
340
341static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
342{
343 struct dlm_rcom *rc;
344 struct dlm_mhandle *mh;
345 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
346 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
347
348 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
349 if (error)
350 return;
351
352
353 if (rc_in->rc_id == 0xFFFFFFFF) {
354 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
355 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
356 return;
357 }
358
359 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
360 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
361 if (error)
362 ret_nodeid = error;
363 rc->rc_result = ret_nodeid;
364 rc->rc_id = rc_in->rc_id;
365 rc->rc_seq_reply = rc_in->rc_seq;
366
367 send_rcom(ls, mh, rc);
368}
369
370static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
371{
372 dlm_recover_master_reply(ls, rc_in);
373}
374
375static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
376 struct rcom_lock *rl)
377{
378 memset(rl, 0, sizeof(*rl));
379
380 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
381 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
382 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
383 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
384 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
385 rl->rl_rqmode = lkb->lkb_rqmode;
386 rl->rl_grmode = lkb->lkb_grmode;
387 rl->rl_status = lkb->lkb_status;
388 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
389
390 if (lkb->lkb_bastfn)
391 rl->rl_asts |= DLM_CB_BAST;
392 if (lkb->lkb_astfn)
393 rl->rl_asts |= DLM_CB_CAST;
394
395 rl->rl_namelen = cpu_to_le16(r->res_length);
396 memcpy(rl->rl_name, r->res_name, r->res_length);
397
398
399
400
401 if (lkb->lkb_lvbptr)
402 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
403}
404
405int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
406{
407 struct dlm_ls *ls = r->res_ls;
408 struct dlm_rcom *rc;
409 struct dlm_mhandle *mh;
410 struct rcom_lock *rl;
411 int error, len = sizeof(struct rcom_lock);
412
413 if (lkb->lkb_lvbptr)
414 len += ls->ls_lvblen;
415
416 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
417 if (error)
418 goto out;
419
420 rl = (struct rcom_lock *) rc->rc_buf;
421 pack_rcom_lock(r, lkb, rl);
422 rc->rc_id = (unsigned long) r;
423
424 send_rcom(ls, mh, rc);
425 out:
426 return error;
427}
428
429
430static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
431{
432 struct dlm_rcom *rc;
433 struct dlm_mhandle *mh;
434 int error, nodeid = rc_in->rc_header.h_nodeid;
435
436 dlm_recover_master_copy(ls, rc_in);
437
438 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
439 sizeof(struct rcom_lock), &rc, &mh);
440 if (error)
441 return;
442
443
444
445
446 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
447 rc->rc_id = rc_in->rc_id;
448 rc->rc_seq_reply = rc_in->rc_seq;
449
450 send_rcom(ls, mh, rc);
451}
452
453
454
455
456int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
457{
458 struct dlm_rcom *rc;
459 struct rcom_config *rf;
460 struct dlm_mhandle *mh;
461 char *mb;
462 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
463
464 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
465 if (!mh)
466 return -ENOBUFS;
467 memset(mb, 0, mb_len);
468
469 rc = (struct dlm_rcom *) mb;
470
471 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
472 rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
473 rc->rc_header.h_nodeid = dlm_our_nodeid();
474 rc->rc_header.h_length = mb_len;
475 rc->rc_header.h_cmd = DLM_RCOM;
476
477 rc->rc_type = DLM_RCOM_STATUS_REPLY;
478 rc->rc_id = rc_in->rc_id;
479 rc->rc_seq_reply = rc_in->rc_seq;
480 rc->rc_result = -ESRCH;
481
482 rf = (struct rcom_config *) rc->rc_buf;
483 rf->rf_lvblen = cpu_to_le32(~0U);
484
485 dlm_rcom_out(rc);
486 dlm_lowcomms_commit_buffer(mh);
487
488 return 0;
489}
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
537{
538 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
539 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
540 uint32_t status;
541 uint64_t seq;
542
543 switch (rc->rc_type) {
544 case DLM_RCOM_STATUS_REPLY:
545 reply = 1;
546 break;
547 case DLM_RCOM_NAMES:
548 names = 1;
549 break;
550 case DLM_RCOM_NAMES_REPLY:
551 names = 1;
552 reply = 1;
553 break;
554 case DLM_RCOM_LOOKUP:
555 lookup = 1;
556 break;
557 case DLM_RCOM_LOOKUP_REPLY:
558 lookup = 1;
559 reply = 1;
560 break;
561 case DLM_RCOM_LOCK:
562 lock = 1;
563 break;
564 case DLM_RCOM_LOCK_REPLY:
565 lock = 1;
566 reply = 1;
567 break;
568 };
569
570 spin_lock(&ls->ls_recover_lock);
571 status = ls->ls_recover_status;
572 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
573 seq = ls->ls_recover_seq;
574 spin_unlock(&ls->ls_recover_lock);
575
576 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
577 goto ignore;
578
579 if (reply && (rc->rc_seq_reply != seq))
580 goto ignore;
581
582 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
583 goto ignore;
584
585 if (!(status & DLM_RS_DIR) && (lookup || lock))
586 goto ignore;
587
588 switch (rc->rc_type) {
589 case DLM_RCOM_STATUS:
590 receive_rcom_status(ls, rc);
591 break;
592
593 case DLM_RCOM_NAMES:
594 receive_rcom_names(ls, rc);
595 break;
596
597 case DLM_RCOM_LOOKUP:
598 receive_rcom_lookup(ls, rc);
599 break;
600
601 case DLM_RCOM_LOCK:
602 if (rc->rc_header.h_length < lock_size)
603 goto Eshort;
604 receive_rcom_lock(ls, rc);
605 break;
606
607 case DLM_RCOM_STATUS_REPLY:
608 receive_sync_reply(ls, rc);
609 break;
610
611 case DLM_RCOM_NAMES_REPLY:
612 receive_sync_reply(ls, rc);
613 break;
614
615 case DLM_RCOM_LOOKUP_REPLY:
616 receive_rcom_lookup_reply(ls, rc);
617 break;
618
619 case DLM_RCOM_LOCK_REPLY:
620 if (rc->rc_header.h_length < lock_size)
621 goto Eshort;
622 dlm_recover_process_copy(ls, rc);
623 break;
624
625 default:
626 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
627 }
628 return;
629
630ignore:
631 log_limit(ls, "dlm_receive_rcom ignore msg %d "
632 "from %d %llu %llu recover seq %llu sts %x gen %u",
633 rc->rc_type,
634 nodeid,
635 (unsigned long long)rc->rc_seq,
636 (unsigned long long)rc->rc_seq_reply,
637 (unsigned long long)seq,
638 status, ls->ls_generation);
639 return;
640Eshort:
641 log_error(ls, "recovery message %d from %d is too short",
642 rc->rc_type, nodeid);
643}
644
645