1
2
3
4
5
6
7
8
9
10
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "midcomms.h"
17#include "rcom.h"
18#include "recover.h"
19#include "dir.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "util.h"
24
25static int rcom_response(struct dlm_ls *ls)
26{
27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28}
29
30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31 struct dlm_rcom **rc_ret, char *mb, int mb_len)
32{
33 struct dlm_rcom *rc;
34
35 rc = (struct dlm_rcom *) mb;
36
37 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
38 rc->rc_header.u.h_lockspace = ls->ls_global_id;
39 rc->rc_header.h_nodeid = dlm_our_nodeid();
40 rc->rc_header.h_length = mb_len;
41 rc->rc_header.h_cmd = DLM_RCOM;
42
43 rc->rc_type = type;
44
45 spin_lock(&ls->ls_recover_lock);
46 rc->rc_seq = ls->ls_recover_seq;
47 spin_unlock(&ls->ls_recover_lock);
48
49 *rc_ret = rc;
50}
51
52static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
53 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
54{
55 int mb_len = sizeof(struct dlm_rcom) + len;
56 struct dlm_mhandle *mh;
57 char *mb;
58
59 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
60 if (!mh) {
61 log_print("%s to %d type %d len %d ENOBUFS",
62 __func__, to_nodeid, type, len);
63 return -ENOBUFS;
64 }
65
66 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
67 *mh_ret = mh;
68 return 0;
69}
70
71static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
72 int len, struct dlm_rcom **rc_ret,
73 struct dlm_msg **msg_ret)
74{
75 int mb_len = sizeof(struct dlm_rcom) + len;
76 struct dlm_msg *msg;
77 char *mb;
78
79 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
80 NULL, NULL);
81 if (!msg) {
82 log_print("create_rcom to %d type %d len %d ENOBUFS",
83 to_nodeid, type, len);
84 return -ENOBUFS;
85 }
86
87 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
88 *msg_ret = msg;
89 return 0;
90}
91
92static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
93{
94 dlm_rcom_out(rc);
95 dlm_midcomms_commit_mhandle(mh);
96}
97
98static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
99{
100 dlm_rcom_out(rc);
101 dlm_lowcomms_commit_msg(msg);
102 dlm_lowcomms_put_msg(msg);
103}
104
105static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
106 uint32_t flags)
107{
108 rs->rs_flags = cpu_to_le32(flags);
109}
110
111
112
113
114
115static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
116 uint32_t num_slots)
117{
118 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
119 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
120
121 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
122 rf->rf_num_slots = cpu_to_le16(num_slots);
123 rf->rf_generation = cpu_to_le32(ls->ls_generation);
124}
125
126static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
127{
128 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
129
130 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
131 log_error(ls, "version mismatch: %x nodeid %d: %x",
132 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
133 rc->rc_header.h_version);
134 return -EPROTO;
135 }
136
137 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
138 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
139 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
140 ls->ls_lvblen, ls->ls_exflags, nodeid,
141 le32_to_cpu(rf->rf_lvblen),
142 le32_to_cpu(rf->rf_lsflags));
143 return -EPROTO;
144 }
145 return 0;
146}
147
148static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
149{
150 spin_lock(&ls->ls_rcom_spin);
151 *new_seq = ++ls->ls_rcom_seq;
152 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
153 spin_unlock(&ls->ls_rcom_spin);
154}
155
156static void disallow_sync_reply(struct dlm_ls *ls)
157{
158 spin_lock(&ls->ls_rcom_spin);
159 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
160 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
161 spin_unlock(&ls->ls_rcom_spin);
162}
163
164
165
166
167
168
169
170
171
172
173
174
175int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
176{
177 struct dlm_rcom *rc;
178 struct dlm_msg *msg;
179 int error = 0;
180
181 ls->ls_recover_nodeid = nodeid;
182
183 if (nodeid == dlm_our_nodeid()) {
184 rc = ls->ls_recover_buf;
185 rc->rc_result = dlm_recover_status(ls);
186 goto out;
187 }
188
189retry:
190 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
191 sizeof(struct rcom_status), &rc, &msg);
192 if (error)
193 goto out;
194
195 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
196
197 allow_sync_reply(ls, &rc->rc_id);
198 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
199
200 send_rcom_stateless(msg, rc);
201
202 error = dlm_wait_function(ls, &rcom_response);
203 disallow_sync_reply(ls);
204 if (error == -ETIMEDOUT)
205 goto retry;
206 if (error)
207 goto out;
208
209 rc = ls->ls_recover_buf;
210
211 if (rc->rc_result == -ESRCH) {
212
213 log_debug(ls, "remote node %d not ready", nodeid);
214 rc->rc_result = 0;
215 error = 0;
216 } else {
217 error = check_rcom_config(ls, rc, nodeid);
218 }
219
220
221 out:
222 return error;
223}
224
225static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
226{
227 struct dlm_rcom *rc;
228 struct rcom_status *rs;
229 uint32_t status;
230 int nodeid = rc_in->rc_header.h_nodeid;
231 int len = sizeof(struct rcom_config);
232 struct dlm_msg *msg;
233 int num_slots = 0;
234 int error;
235
236 if (!dlm_slots_version(&rc_in->rc_header)) {
237 status = dlm_recover_status(ls);
238 goto do_create;
239 }
240
241 rs = (struct rcom_status *)rc_in->rc_buf;
242
243 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
244 status = dlm_recover_status(ls);
245 goto do_create;
246 }
247
248 spin_lock(&ls->ls_recover_lock);
249 status = ls->ls_recover_status;
250 num_slots = ls->ls_num_slots;
251 spin_unlock(&ls->ls_recover_lock);
252 len += num_slots * sizeof(struct rcom_slot);
253
254 do_create:
255 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
256 len, &rc, &msg);
257 if (error)
258 return;
259
260 rc->rc_id = rc_in->rc_id;
261 rc->rc_seq_reply = rc_in->rc_seq;
262 rc->rc_result = status;
263
264 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
265
266 if (!num_slots)
267 goto do_send;
268
269 spin_lock(&ls->ls_recover_lock);
270 if (ls->ls_num_slots != num_slots) {
271 spin_unlock(&ls->ls_recover_lock);
272 log_debug(ls, "receive_rcom_status num_slots %d to %d",
273 num_slots, ls->ls_num_slots);
274 rc->rc_result = 0;
275 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
276 goto do_send;
277 }
278
279 dlm_slots_copy_out(ls, rc);
280 spin_unlock(&ls->ls_recover_lock);
281
282 do_send:
283 send_rcom_stateless(msg, rc);
284}
285
286static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
287{
288 spin_lock(&ls->ls_rcom_spin);
289 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
290 rc_in->rc_id != ls->ls_rcom_seq) {
291 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
292 rc_in->rc_type, rc_in->rc_header.h_nodeid,
293 (unsigned long long)rc_in->rc_id,
294 (unsigned long long)ls->ls_rcom_seq);
295 goto out;
296 }
297 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
298 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
299 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
300 wake_up(&ls->ls_wait_general);
301 out:
302 spin_unlock(&ls->ls_rcom_spin);
303}
304
305int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
306{
307 struct dlm_rcom *rc;
308 struct dlm_msg *msg;
309 int error = 0;
310
311 ls->ls_recover_nodeid = nodeid;
312
313retry:
314 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
315 &rc, &msg);
316 if (error)
317 goto out;
318 memcpy(rc->rc_buf, last_name, last_len);
319
320 allow_sync_reply(ls, &rc->rc_id);
321 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
322
323 send_rcom_stateless(msg, rc);
324
325 error = dlm_wait_function(ls, &rcom_response);
326 disallow_sync_reply(ls);
327 if (error == -ETIMEDOUT)
328 goto retry;
329 out:
330 return error;
331}
332
333static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
334{
335 struct dlm_rcom *rc;
336 int error, inlen, outlen, nodeid;
337 struct dlm_msg *msg;
338
339 nodeid = rc_in->rc_header.h_nodeid;
340 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
341 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
342
343 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
344 &rc, &msg);
345 if (error)
346 return;
347 rc->rc_id = rc_in->rc_id;
348 rc->rc_seq_reply = rc_in->rc_seq;
349
350 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
351 nodeid);
352 send_rcom_stateless(msg, rc);
353}
354
355int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
356{
357 struct dlm_rcom *rc;
358 struct dlm_mhandle *mh;
359 struct dlm_ls *ls = r->res_ls;
360 int error;
361
362 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
363 &rc, &mh);
364 if (error)
365 goto out;
366 memcpy(rc->rc_buf, r->res_name, r->res_length);
367 rc->rc_id = (unsigned long) r->res_id;
368
369 send_rcom(mh, rc);
370 out:
371 return error;
372}
373
374static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
375{
376 struct dlm_rcom *rc;
377 struct dlm_mhandle *mh;
378 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
379 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
380
381
382 if (rc_in->rc_id == 0xFFFFFFFF) {
383 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
384 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
385 return;
386 }
387
388 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
389 if (error)
390 return;
391
392 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
393 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
394 if (error)
395 ret_nodeid = error;
396 rc->rc_result = ret_nodeid;
397 rc->rc_id = rc_in->rc_id;
398 rc->rc_seq_reply = rc_in->rc_seq;
399
400 send_rcom(mh, rc);
401}
402
403static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
404{
405 dlm_recover_master_reply(ls, rc_in);
406}
407
408static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
409 struct rcom_lock *rl)
410{
411 memset(rl, 0, sizeof(*rl));
412
413 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
414 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
415 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
416 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
417 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
418 rl->rl_rqmode = lkb->lkb_rqmode;
419 rl->rl_grmode = lkb->lkb_grmode;
420 rl->rl_status = lkb->lkb_status;
421 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
422
423 if (lkb->lkb_bastfn)
424 rl->rl_asts |= DLM_CB_BAST;
425 if (lkb->lkb_astfn)
426 rl->rl_asts |= DLM_CB_CAST;
427
428 rl->rl_namelen = cpu_to_le16(r->res_length);
429 memcpy(rl->rl_name, r->res_name, r->res_length);
430
431
432
433
434 if (lkb->lkb_lvbptr)
435 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
436}
437
438int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
439{
440 struct dlm_ls *ls = r->res_ls;
441 struct dlm_rcom *rc;
442 struct dlm_mhandle *mh;
443 struct rcom_lock *rl;
444 int error, len = sizeof(struct rcom_lock);
445
446 if (lkb->lkb_lvbptr)
447 len += ls->ls_lvblen;
448
449 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
450 if (error)
451 goto out;
452
453 rl = (struct rcom_lock *) rc->rc_buf;
454 pack_rcom_lock(r, lkb, rl);
455 rc->rc_id = (unsigned long) r;
456
457 send_rcom(mh, rc);
458 out:
459 return error;
460}
461
462
463static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
464{
465 struct dlm_rcom *rc;
466 struct dlm_mhandle *mh;
467 int error, nodeid = rc_in->rc_header.h_nodeid;
468
469 dlm_recover_master_copy(ls, rc_in);
470
471 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
472 sizeof(struct rcom_lock), &rc, &mh);
473 if (error)
474 return;
475
476
477
478
479 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
480 rc->rc_id = rc_in->rc_id;
481 rc->rc_seq_reply = rc_in->rc_seq;
482
483 send_rcom(mh, rc);
484}
485
486
487
488
489int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
490{
491 struct dlm_rcom *rc;
492 struct rcom_config *rf;
493 struct dlm_mhandle *mh;
494 char *mb;
495 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
496
497 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
498 if (!mh)
499 return -ENOBUFS;
500
501 rc = (struct dlm_rcom *) mb;
502
503 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
504 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
505 rc->rc_header.h_nodeid = dlm_our_nodeid();
506 rc->rc_header.h_length = mb_len;
507 rc->rc_header.h_cmd = DLM_RCOM;
508
509 rc->rc_type = DLM_RCOM_STATUS_REPLY;
510 rc->rc_id = rc_in->rc_id;
511 rc->rc_seq_reply = rc_in->rc_seq;
512 rc->rc_result = -ESRCH;
513
514 rf = (struct rcom_config *) rc->rc_buf;
515 rf->rf_lvblen = cpu_to_le32(~0U);
516
517 dlm_rcom_out(rc);
518 dlm_midcomms_commit_mhandle(mh);
519
520 return 0;
521}
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
569{
570 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
571 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
572 uint32_t status;
573 uint64_t seq;
574
575 switch (rc->rc_type) {
576 case DLM_RCOM_STATUS_REPLY:
577 reply = 1;
578 break;
579 case DLM_RCOM_NAMES:
580 names = 1;
581 break;
582 case DLM_RCOM_NAMES_REPLY:
583 names = 1;
584 reply = 1;
585 break;
586 case DLM_RCOM_LOOKUP:
587 lookup = 1;
588 break;
589 case DLM_RCOM_LOOKUP_REPLY:
590 lookup = 1;
591 reply = 1;
592 break;
593 case DLM_RCOM_LOCK:
594 lock = 1;
595 break;
596 case DLM_RCOM_LOCK_REPLY:
597 lock = 1;
598 reply = 1;
599 break;
600 }
601
602 spin_lock(&ls->ls_recover_lock);
603 status = ls->ls_recover_status;
604 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
605 seq = ls->ls_recover_seq;
606 spin_unlock(&ls->ls_recover_lock);
607
608 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
609 goto ignore;
610
611 if (reply && (rc->rc_seq_reply != seq))
612 goto ignore;
613
614 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
615 goto ignore;
616
617 if (!(status & DLM_RS_DIR) && (lookup || lock))
618 goto ignore;
619
620 switch (rc->rc_type) {
621 case DLM_RCOM_STATUS:
622 receive_rcom_status(ls, rc);
623 break;
624
625 case DLM_RCOM_NAMES:
626 receive_rcom_names(ls, rc);
627 break;
628
629 case DLM_RCOM_LOOKUP:
630 receive_rcom_lookup(ls, rc);
631 break;
632
633 case DLM_RCOM_LOCK:
634 if (rc->rc_header.h_length < lock_size)
635 goto Eshort;
636 receive_rcom_lock(ls, rc);
637 break;
638
639 case DLM_RCOM_STATUS_REPLY:
640 receive_sync_reply(ls, rc);
641 break;
642
643 case DLM_RCOM_NAMES_REPLY:
644 receive_sync_reply(ls, rc);
645 break;
646
647 case DLM_RCOM_LOOKUP_REPLY:
648 receive_rcom_lookup_reply(ls, rc);
649 break;
650
651 case DLM_RCOM_LOCK_REPLY:
652 if (rc->rc_header.h_length < lock_size)
653 goto Eshort;
654 dlm_recover_process_copy(ls, rc);
655 break;
656
657 default:
658 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
659 }
660 return;
661
662ignore:
663 log_limit(ls, "dlm_receive_rcom ignore msg %d "
664 "from %d %llu %llu recover seq %llu sts %x gen %u",
665 rc->rc_type,
666 nodeid,
667 (unsigned long long)rc->rc_seq,
668 (unsigned long long)rc->rc_seq_reply,
669 (unsigned long long)seq,
670 status, ls->ls_generation);
671 return;
672Eshort:
673 log_error(ls, "recovery message %d from %d is too short",
674 rc->rc_type, nodeid);
675}
676
677