1
2
3
4
5
6
7
8
9
10
11
12
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "midcomms.h"
19#include "rcom.h"
20#include "recover.h"
21#include "dir.h"
22#include "config.h"
23#include "memory.h"
24#include "lock.h"
25#include "util.h"
26
27static int rcom_response(struct dlm_ls *ls)
28{
29 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
30}
31
32static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
33 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
34{
35 struct dlm_rcom *rc;
36 struct dlm_mhandle *mh;
37 char *mb;
38 int mb_len = sizeof(struct dlm_rcom) + len;
39
40 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
41 if (!mh) {
42 log_print("create_rcom to %d type %d len %d ENOBUFS",
43 to_nodeid, type, len);
44 return -ENOBUFS;
45 }
46 memset(mb, 0, mb_len);
47
48 rc = (struct dlm_rcom *) mb;
49
50 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
51 rc->rc_header.h_lockspace = ls->ls_global_id;
52 rc->rc_header.h_nodeid = dlm_our_nodeid();
53 rc->rc_header.h_length = mb_len;
54 rc->rc_header.h_cmd = DLM_RCOM;
55
56 rc->rc_type = type;
57
58 spin_lock(&ls->ls_recover_lock);
59 rc->rc_seq = ls->ls_recover_seq;
60 spin_unlock(&ls->ls_recover_lock);
61
62 *mh_ret = mh;
63 *rc_ret = rc;
64 return 0;
65}
66
67static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
68 struct dlm_rcom *rc)
69{
70 dlm_rcom_out(rc);
71 dlm_lowcomms_commit_buffer(mh);
72}
73
74static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
75 uint32_t flags)
76{
77 rs->rs_flags = cpu_to_le32(flags);
78}
79
80
81
82
83
84static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
85 uint32_t num_slots)
86{
87 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
88 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
89
90 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
91 rf->rf_num_slots = cpu_to_le16(num_slots);
92 rf->rf_generation = cpu_to_le32(ls->ls_generation);
93}
94
95static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
96{
97 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
98
99 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
100 log_error(ls, "version mismatch: %x nodeid %d: %x",
101 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
102 rc->rc_header.h_version);
103 return -EPROTO;
104 }
105
106 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
107 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
108 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
109 ls->ls_lvblen, ls->ls_exflags, nodeid,
110 le32_to_cpu(rf->rf_lvblen),
111 le32_to_cpu(rf->rf_lsflags));
112 return -EPROTO;
113 }
114 return 0;
115}
116
117static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
118{
119 spin_lock(&ls->ls_rcom_spin);
120 *new_seq = ++ls->ls_rcom_seq;
121 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
122 spin_unlock(&ls->ls_rcom_spin);
123}
124
125static void disallow_sync_reply(struct dlm_ls *ls)
126{
127 spin_lock(&ls->ls_rcom_spin);
128 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
129 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
130 spin_unlock(&ls->ls_rcom_spin);
131}
132
133
134
135
136
137
138
139
140
141
142
143
144int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
145{
146 struct dlm_rcom *rc;
147 struct dlm_mhandle *mh;
148 int error = 0;
149
150 ls->ls_recover_nodeid = nodeid;
151
152 if (nodeid == dlm_our_nodeid()) {
153 rc = ls->ls_recover_buf;
154 rc->rc_result = dlm_recover_status(ls);
155 goto out;
156 }
157
158 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
159 sizeof(struct rcom_status), &rc, &mh);
160 if (error)
161 goto out;
162
163 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
164
165 allow_sync_reply(ls, &rc->rc_id);
166 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
167
168 send_rcom(ls, mh, rc);
169
170 error = dlm_wait_function(ls, &rcom_response);
171 disallow_sync_reply(ls);
172 if (error)
173 goto out;
174
175 rc = ls->ls_recover_buf;
176
177 if (rc->rc_result == -ESRCH) {
178
179 log_debug(ls, "remote node %d not ready", nodeid);
180 rc->rc_result = 0;
181 error = 0;
182 } else {
183 error = check_rcom_config(ls, rc, nodeid);
184 }
185
186
187 out:
188 return error;
189}
190
191static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
192{
193 struct dlm_rcom *rc;
194 struct dlm_mhandle *mh;
195 struct rcom_status *rs;
196 uint32_t status;
197 int nodeid = rc_in->rc_header.h_nodeid;
198 int len = sizeof(struct rcom_config);
199 int num_slots = 0;
200 int error;
201
202 if (!dlm_slots_version(&rc_in->rc_header)) {
203 status = dlm_recover_status(ls);
204 goto do_create;
205 }
206
207 rs = (struct rcom_status *)rc_in->rc_buf;
208
209 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
210 status = dlm_recover_status(ls);
211 goto do_create;
212 }
213
214 spin_lock(&ls->ls_recover_lock);
215 status = ls->ls_recover_status;
216 num_slots = ls->ls_num_slots;
217 spin_unlock(&ls->ls_recover_lock);
218 len += num_slots * sizeof(struct rcom_slot);
219
220 do_create:
221 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
222 len, &rc, &mh);
223 if (error)
224 return;
225
226 rc->rc_id = rc_in->rc_id;
227 rc->rc_seq_reply = rc_in->rc_seq;
228 rc->rc_result = status;
229
230 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
231
232 if (!num_slots)
233 goto do_send;
234
235 spin_lock(&ls->ls_recover_lock);
236 if (ls->ls_num_slots != num_slots) {
237 spin_unlock(&ls->ls_recover_lock);
238 log_debug(ls, "receive_rcom_status num_slots %d to %d",
239 num_slots, ls->ls_num_slots);
240 rc->rc_result = 0;
241 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
242 goto do_send;
243 }
244
245 dlm_slots_copy_out(ls, rc);
246 spin_unlock(&ls->ls_recover_lock);
247
248 do_send:
249 send_rcom(ls, mh, rc);
250}
251
252static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
253{
254 spin_lock(&ls->ls_rcom_spin);
255 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
256 rc_in->rc_id != ls->ls_rcom_seq) {
257 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
258 rc_in->rc_type, rc_in->rc_header.h_nodeid,
259 (unsigned long long)rc_in->rc_id,
260 (unsigned long long)ls->ls_rcom_seq);
261 goto out;
262 }
263 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
264 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
265 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
266 wake_up(&ls->ls_wait_general);
267 out:
268 spin_unlock(&ls->ls_rcom_spin);
269}
270
271int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
272{
273 struct dlm_rcom *rc;
274 struct dlm_mhandle *mh;
275 int error = 0;
276
277 ls->ls_recover_nodeid = nodeid;
278
279 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
280 if (error)
281 goto out;
282 memcpy(rc->rc_buf, last_name, last_len);
283
284 allow_sync_reply(ls, &rc->rc_id);
285 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
286
287 send_rcom(ls, mh, rc);
288
289 error = dlm_wait_function(ls, &rcom_response);
290 disallow_sync_reply(ls);
291 out:
292 return error;
293}
294
295static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
296{
297 struct dlm_rcom *rc;
298 struct dlm_mhandle *mh;
299 int error, inlen, outlen, nodeid;
300
301 nodeid = rc_in->rc_header.h_nodeid;
302 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
303 outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
304
305 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
306 if (error)
307 return;
308 rc->rc_id = rc_in->rc_id;
309 rc->rc_seq_reply = rc_in->rc_seq;
310
311 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
312 nodeid);
313 send_rcom(ls, mh, rc);
314}
315
316int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
317{
318 struct dlm_rcom *rc;
319 struct dlm_mhandle *mh;
320 struct dlm_ls *ls = r->res_ls;
321 int error;
322
323 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
324 &rc, &mh);
325 if (error)
326 goto out;
327 memcpy(rc->rc_buf, r->res_name, r->res_length);
328 rc->rc_id = (unsigned long) r->res_id;
329
330 send_rcom(ls, mh, rc);
331 out:
332 return error;
333}
334
335int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
336{
337 struct dlm_rcom *rc;
338 struct dlm_mhandle *mh;
339 struct dlm_ls *ls = r->res_ls;
340 int error;
341
342 error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
343 &rc, &mh);
344 if (error)
345 goto out;
346 memcpy(rc->rc_buf, r->res_name, r->res_length);
347 rc->rc_id = 0xFFFFFFFF;
348
349 send_rcom(ls, mh, rc);
350 out:
351 return error;
352}
353
354static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
355{
356 struct dlm_rcom *rc;
357 struct dlm_mhandle *mh;
358 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
359 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
360
361 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
362 if (error)
363 return;
364
365 if (rc_in->rc_id == 0xFFFFFFFF) {
366 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
367 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
368 return;
369 }
370
371 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
372 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
373 if (error)
374 ret_nodeid = error;
375 rc->rc_result = ret_nodeid;
376 rc->rc_id = rc_in->rc_id;
377 rc->rc_seq_reply = rc_in->rc_seq;
378
379 send_rcom(ls, mh, rc);
380}
381
382static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
383{
384 dlm_recover_master_reply(ls, rc_in);
385}
386
387static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
388 struct rcom_lock *rl)
389{
390 memset(rl, 0, sizeof(*rl));
391
392 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
393 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
394 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
395 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
396 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
397 rl->rl_rqmode = lkb->lkb_rqmode;
398 rl->rl_grmode = lkb->lkb_grmode;
399 rl->rl_status = lkb->lkb_status;
400 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
401
402 if (lkb->lkb_bastfn)
403 rl->rl_asts |= DLM_CB_BAST;
404 if (lkb->lkb_astfn)
405 rl->rl_asts |= DLM_CB_CAST;
406
407 rl->rl_namelen = cpu_to_le16(r->res_length);
408 memcpy(rl->rl_name, r->res_name, r->res_length);
409
410
411
412
413 if (lkb->lkb_lvbptr)
414 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
415}
416
417int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
418{
419 struct dlm_ls *ls = r->res_ls;
420 struct dlm_rcom *rc;
421 struct dlm_mhandle *mh;
422 struct rcom_lock *rl;
423 int error, len = sizeof(struct rcom_lock);
424
425 if (lkb->lkb_lvbptr)
426 len += ls->ls_lvblen;
427
428 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
429 if (error)
430 goto out;
431
432 rl = (struct rcom_lock *) rc->rc_buf;
433 pack_rcom_lock(r, lkb, rl);
434 rc->rc_id = (unsigned long) r;
435
436 send_rcom(ls, mh, rc);
437 out:
438 return error;
439}
440
441
442static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
443{
444 struct dlm_rcom *rc;
445 struct dlm_mhandle *mh;
446 int error, nodeid = rc_in->rc_header.h_nodeid;
447
448 dlm_recover_master_copy(ls, rc_in);
449
450 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
451 sizeof(struct rcom_lock), &rc, &mh);
452 if (error)
453 return;
454
455
456
457
458 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
459 rc->rc_id = rc_in->rc_id;
460 rc->rc_seq_reply = rc_in->rc_seq;
461
462 send_rcom(ls, mh, rc);
463}
464
465
466
467
468int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
469{
470 struct dlm_rcom *rc;
471 struct rcom_config *rf;
472 struct dlm_mhandle *mh;
473 char *mb;
474 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
475
476 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
477 if (!mh)
478 return -ENOBUFS;
479 memset(mb, 0, mb_len);
480
481 rc = (struct dlm_rcom *) mb;
482
483 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
484 rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
485 rc->rc_header.h_nodeid = dlm_our_nodeid();
486 rc->rc_header.h_length = mb_len;
487 rc->rc_header.h_cmd = DLM_RCOM;
488
489 rc->rc_type = DLM_RCOM_STATUS_REPLY;
490 rc->rc_id = rc_in->rc_id;
491 rc->rc_seq_reply = rc_in->rc_seq;
492 rc->rc_result = -ESRCH;
493
494 rf = (struct rcom_config *) rc->rc_buf;
495 rf->rf_lvblen = cpu_to_le32(~0U);
496
497 dlm_rcom_out(rc);
498 dlm_lowcomms_commit_buffer(mh);
499
500 return 0;
501}
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
549{
550 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
551 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
552 uint32_t status;
553 uint64_t seq;
554
555 switch (rc->rc_type) {
556 case DLM_RCOM_STATUS_REPLY:
557 reply = 1;
558 break;
559 case DLM_RCOM_NAMES:
560 names = 1;
561 break;
562 case DLM_RCOM_NAMES_REPLY:
563 names = 1;
564 reply = 1;
565 break;
566 case DLM_RCOM_LOOKUP:
567 lookup = 1;
568 break;
569 case DLM_RCOM_LOOKUP_REPLY:
570 lookup = 1;
571 reply = 1;
572 break;
573 case DLM_RCOM_LOCK:
574 lock = 1;
575 break;
576 case DLM_RCOM_LOCK_REPLY:
577 lock = 1;
578 reply = 1;
579 break;
580 };
581
582 spin_lock(&ls->ls_recover_lock);
583 status = ls->ls_recover_status;
584 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
585 seq = ls->ls_recover_seq;
586 spin_unlock(&ls->ls_recover_lock);
587
588 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
589 goto ignore;
590
591 if (reply && (rc->rc_seq_reply != seq))
592 goto ignore;
593
594 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
595 goto ignore;
596
597 if (!(status & DLM_RS_DIR) && (lookup || lock))
598 goto ignore;
599
600 switch (rc->rc_type) {
601 case DLM_RCOM_STATUS:
602 receive_rcom_status(ls, rc);
603 break;
604
605 case DLM_RCOM_NAMES:
606 receive_rcom_names(ls, rc);
607 break;
608
609 case DLM_RCOM_LOOKUP:
610 receive_rcom_lookup(ls, rc);
611 break;
612
613 case DLM_RCOM_LOCK:
614 if (rc->rc_header.h_length < lock_size)
615 goto Eshort;
616 receive_rcom_lock(ls, rc);
617 break;
618
619 case DLM_RCOM_STATUS_REPLY:
620 receive_sync_reply(ls, rc);
621 break;
622
623 case DLM_RCOM_NAMES_REPLY:
624 receive_sync_reply(ls, rc);
625 break;
626
627 case DLM_RCOM_LOOKUP_REPLY:
628 receive_rcom_lookup_reply(ls, rc);
629 break;
630
631 case DLM_RCOM_LOCK_REPLY:
632 if (rc->rc_header.h_length < lock_size)
633 goto Eshort;
634 dlm_recover_process_copy(ls, rc);
635 break;
636
637 default:
638 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
639 }
640 return;
641
642ignore:
643 log_limit(ls, "dlm_receive_rcom ignore msg %d "
644 "from %d %llu %llu recover seq %llu sts %x gen %u",
645 rc->rc_type,
646 nodeid,
647 (unsigned long long)rc->rc_seq,
648 (unsigned long long)rc->rc_seq_reply,
649 (unsigned long long)seq,
650 status, ls->ls_generation);
651 return;
652Eshort:
653 log_error(ls, "recovery message %d from %d is too short",
654 rc->rc_type, nodeid);
655}
656
657