1
2
3
4
5
6
7
8
9
10
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "midcomms.h"
17#include "rcom.h"
18#include "recover.h"
19#include "dir.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "util.h"
24
25static int rcom_response(struct dlm_ls *ls)
26{
27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28}
29
30static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32{
33 struct dlm_rcom *rc;
34 struct dlm_mhandle *mh;
35 char *mb;
36 int mb_len = sizeof(struct dlm_rcom) + len;
37
38 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39 if (!mh) {
40 log_print("create_rcom to %d type %d len %d ENOBUFS",
41 to_nodeid, type, len);
42 return -ENOBUFS;
43 }
44
45 rc = (struct dlm_rcom *) mb;
46
47 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
48 rc->rc_header.h_lockspace = ls->ls_global_id;
49 rc->rc_header.h_nodeid = dlm_our_nodeid();
50 rc->rc_header.h_length = mb_len;
51 rc->rc_header.h_cmd = DLM_RCOM;
52
53 rc->rc_type = type;
54
55 spin_lock(&ls->ls_recover_lock);
56 rc->rc_seq = ls->ls_recover_seq;
57 spin_unlock(&ls->ls_recover_lock);
58
59 *mh_ret = mh;
60 *rc_ret = rc;
61 return 0;
62}
63
64static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
65 struct dlm_rcom *rc)
66{
67 dlm_rcom_out(rc);
68 dlm_lowcomms_commit_buffer(mh);
69}
70
71static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
72 uint32_t flags)
73{
74 rs->rs_flags = cpu_to_le32(flags);
75}
76
77
78
79
80
81static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
82 uint32_t num_slots)
83{
84 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
85 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
86
87 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
88 rf->rf_num_slots = cpu_to_le16(num_slots);
89 rf->rf_generation = cpu_to_le32(ls->ls_generation);
90}
91
92static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
93{
94 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
95
96 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
97 log_error(ls, "version mismatch: %x nodeid %d: %x",
98 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
99 rc->rc_header.h_version);
100 return -EPROTO;
101 }
102
103 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
106 ls->ls_lvblen, ls->ls_exflags, nodeid,
107 le32_to_cpu(rf->rf_lvblen),
108 le32_to_cpu(rf->rf_lsflags));
109 return -EPROTO;
110 }
111 return 0;
112}
113
114static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
115{
116 spin_lock(&ls->ls_rcom_spin);
117 *new_seq = ++ls->ls_rcom_seq;
118 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
119 spin_unlock(&ls->ls_rcom_spin);
120}
121
122static void disallow_sync_reply(struct dlm_ls *ls)
123{
124 spin_lock(&ls->ls_rcom_spin);
125 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
126 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
127 spin_unlock(&ls->ls_rcom_spin);
128}
129
130
131
132
133
134
135
136
137
138
139
140
141int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
142{
143 struct dlm_rcom *rc;
144 struct dlm_mhandle *mh;
145 int error = 0;
146
147 ls->ls_recover_nodeid = nodeid;
148
149 if (nodeid == dlm_our_nodeid()) {
150 rc = ls->ls_recover_buf;
151 rc->rc_result = dlm_recover_status(ls);
152 goto out;
153 }
154
155retry:
156 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
157 sizeof(struct rcom_status), &rc, &mh);
158 if (error)
159 goto out;
160
161 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
162
163 allow_sync_reply(ls, &rc->rc_id);
164 memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
165
166 send_rcom(ls, mh, rc);
167
168 error = dlm_wait_function(ls, &rcom_response);
169 disallow_sync_reply(ls);
170 if (error == -ETIMEDOUT)
171 goto retry;
172 if (error)
173 goto out;
174
175 rc = ls->ls_recover_buf;
176
177 if (rc->rc_result == -ESRCH) {
178
179 log_debug(ls, "remote node %d not ready", nodeid);
180 rc->rc_result = 0;
181 error = 0;
182 } else {
183 error = check_rcom_config(ls, rc, nodeid);
184 }
185
186
187 out:
188 return error;
189}
190
191static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
192{
193 struct dlm_rcom *rc;
194 struct dlm_mhandle *mh;
195 struct rcom_status *rs;
196 uint32_t status;
197 int nodeid = rc_in->rc_header.h_nodeid;
198 int len = sizeof(struct rcom_config);
199 int num_slots = 0;
200 int error;
201
202 if (!dlm_slots_version(&rc_in->rc_header)) {
203 status = dlm_recover_status(ls);
204 goto do_create;
205 }
206
207 rs = (struct rcom_status *)rc_in->rc_buf;
208
209 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
210 status = dlm_recover_status(ls);
211 goto do_create;
212 }
213
214 spin_lock(&ls->ls_recover_lock);
215 status = ls->ls_recover_status;
216 num_slots = ls->ls_num_slots;
217 spin_unlock(&ls->ls_recover_lock);
218 len += num_slots * sizeof(struct rcom_slot);
219
220 do_create:
221 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
222 len, &rc, &mh);
223 if (error)
224 return;
225
226 rc->rc_id = rc_in->rc_id;
227 rc->rc_seq_reply = rc_in->rc_seq;
228 rc->rc_result = status;
229
230 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
231
232 if (!num_slots)
233 goto do_send;
234
235 spin_lock(&ls->ls_recover_lock);
236 if (ls->ls_num_slots != num_slots) {
237 spin_unlock(&ls->ls_recover_lock);
238 log_debug(ls, "receive_rcom_status num_slots %d to %d",
239 num_slots, ls->ls_num_slots);
240 rc->rc_result = 0;
241 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
242 goto do_send;
243 }
244
245 dlm_slots_copy_out(ls, rc);
246 spin_unlock(&ls->ls_recover_lock);
247
248 do_send:
249 send_rcom(ls, mh, rc);
250}
251
252static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
253{
254 spin_lock(&ls->ls_rcom_spin);
255 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
256 rc_in->rc_id != ls->ls_rcom_seq) {
257 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
258 rc_in->rc_type, rc_in->rc_header.h_nodeid,
259 (unsigned long long)rc_in->rc_id,
260 (unsigned long long)ls->ls_rcom_seq);
261 goto out;
262 }
263 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
264 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
265 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
266 wake_up(&ls->ls_wait_general);
267 out:
268 spin_unlock(&ls->ls_rcom_spin);
269}
270
271int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
272{
273 struct dlm_rcom *rc;
274 struct dlm_mhandle *mh;
275 int error = 0;
276
277 ls->ls_recover_nodeid = nodeid;
278
279retry:
280 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
281 if (error)
282 goto out;
283 memcpy(rc->rc_buf, last_name, last_len);
284
285 allow_sync_reply(ls, &rc->rc_id);
286 memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
287
288 send_rcom(ls, mh, rc);
289
290 error = dlm_wait_function(ls, &rcom_response);
291 disallow_sync_reply(ls);
292 if (error == -ETIMEDOUT)
293 goto retry;
294 out:
295 return error;
296}
297
298static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
299{
300 struct dlm_rcom *rc;
301 struct dlm_mhandle *mh;
302 int error, inlen, outlen, nodeid;
303
304 nodeid = rc_in->rc_header.h_nodeid;
305 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
306 outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
307
308 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
309 if (error)
310 return;
311 rc->rc_id = rc_in->rc_id;
312 rc->rc_seq_reply = rc_in->rc_seq;
313
314 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
315 nodeid);
316 send_rcom(ls, mh, rc);
317}
318
319int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
320{
321 struct dlm_rcom *rc;
322 struct dlm_mhandle *mh;
323 struct dlm_ls *ls = r->res_ls;
324 int error;
325
326 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
327 &rc, &mh);
328 if (error)
329 goto out;
330 memcpy(rc->rc_buf, r->res_name, r->res_length);
331 rc->rc_id = (unsigned long) r->res_id;
332
333 send_rcom(ls, mh, rc);
334 out:
335 return error;
336}
337
338static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
339{
340 struct dlm_rcom *rc;
341 struct dlm_mhandle *mh;
342 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
343 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
344
345 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
346 if (error)
347 return;
348
349
350 if (rc_in->rc_id == 0xFFFFFFFF) {
351 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
352 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
353 return;
354 }
355
356 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
357 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
358 if (error)
359 ret_nodeid = error;
360 rc->rc_result = ret_nodeid;
361 rc->rc_id = rc_in->rc_id;
362 rc->rc_seq_reply = rc_in->rc_seq;
363
364 send_rcom(ls, mh, rc);
365}
366
367static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
368{
369 dlm_recover_master_reply(ls, rc_in);
370}
371
372static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
373 struct rcom_lock *rl)
374{
375 memset(rl, 0, sizeof(*rl));
376
377 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
378 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
379 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
380 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
381 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
382 rl->rl_rqmode = lkb->lkb_rqmode;
383 rl->rl_grmode = lkb->lkb_grmode;
384 rl->rl_status = lkb->lkb_status;
385 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
386
387 if (lkb->lkb_bastfn)
388 rl->rl_asts |= DLM_CB_BAST;
389 if (lkb->lkb_astfn)
390 rl->rl_asts |= DLM_CB_CAST;
391
392 rl->rl_namelen = cpu_to_le16(r->res_length);
393 memcpy(rl->rl_name, r->res_name, r->res_length);
394
395
396
397
398 if (lkb->lkb_lvbptr)
399 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
400}
401
402int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
403{
404 struct dlm_ls *ls = r->res_ls;
405 struct dlm_rcom *rc;
406 struct dlm_mhandle *mh;
407 struct rcom_lock *rl;
408 int error, len = sizeof(struct rcom_lock);
409
410 if (lkb->lkb_lvbptr)
411 len += ls->ls_lvblen;
412
413 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
414 if (error)
415 goto out;
416
417 rl = (struct rcom_lock *) rc->rc_buf;
418 pack_rcom_lock(r, lkb, rl);
419 rc->rc_id = (unsigned long) r;
420
421 send_rcom(ls, mh, rc);
422 out:
423 return error;
424}
425
426
427static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
428{
429 struct dlm_rcom *rc;
430 struct dlm_mhandle *mh;
431 int error, nodeid = rc_in->rc_header.h_nodeid;
432
433 dlm_recover_master_copy(ls, rc_in);
434
435 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
436 sizeof(struct rcom_lock), &rc, &mh);
437 if (error)
438 return;
439
440
441
442
443 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
444 rc->rc_id = rc_in->rc_id;
445 rc->rc_seq_reply = rc_in->rc_seq;
446
447 send_rcom(ls, mh, rc);
448}
449
450
451
452
453int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
454{
455 struct dlm_rcom *rc;
456 struct rcom_config *rf;
457 struct dlm_mhandle *mh;
458 char *mb;
459 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
460
461 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
462 if (!mh)
463 return -ENOBUFS;
464
465 rc = (struct dlm_rcom *) mb;
466
467 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
468 rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
469 rc->rc_header.h_nodeid = dlm_our_nodeid();
470 rc->rc_header.h_length = mb_len;
471 rc->rc_header.h_cmd = DLM_RCOM;
472
473 rc->rc_type = DLM_RCOM_STATUS_REPLY;
474 rc->rc_id = rc_in->rc_id;
475 rc->rc_seq_reply = rc_in->rc_seq;
476 rc->rc_result = -ESRCH;
477
478 rf = (struct rcom_config *) rc->rc_buf;
479 rf->rf_lvblen = cpu_to_le32(~0U);
480
481 dlm_rcom_out(rc);
482 dlm_lowcomms_commit_buffer(mh);
483
484 return 0;
485}
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
533{
534 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
535 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
536 uint32_t status;
537 uint64_t seq;
538
539 switch (rc->rc_type) {
540 case DLM_RCOM_STATUS_REPLY:
541 reply = 1;
542 break;
543 case DLM_RCOM_NAMES:
544 names = 1;
545 break;
546 case DLM_RCOM_NAMES_REPLY:
547 names = 1;
548 reply = 1;
549 break;
550 case DLM_RCOM_LOOKUP:
551 lookup = 1;
552 break;
553 case DLM_RCOM_LOOKUP_REPLY:
554 lookup = 1;
555 reply = 1;
556 break;
557 case DLM_RCOM_LOCK:
558 lock = 1;
559 break;
560 case DLM_RCOM_LOCK_REPLY:
561 lock = 1;
562 reply = 1;
563 break;
564 }
565
566 spin_lock(&ls->ls_recover_lock);
567 status = ls->ls_recover_status;
568 stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
569 seq = ls->ls_recover_seq;
570 spin_unlock(&ls->ls_recover_lock);
571
572 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
573 goto ignore;
574
575 if (reply && (rc->rc_seq_reply != seq))
576 goto ignore;
577
578 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
579 goto ignore;
580
581 if (!(status & DLM_RS_DIR) && (lookup || lock))
582 goto ignore;
583
584 switch (rc->rc_type) {
585 case DLM_RCOM_STATUS:
586 receive_rcom_status(ls, rc);
587 break;
588
589 case DLM_RCOM_NAMES:
590 receive_rcom_names(ls, rc);
591 break;
592
593 case DLM_RCOM_LOOKUP:
594 receive_rcom_lookup(ls, rc);
595 break;
596
597 case DLM_RCOM_LOCK:
598 if (rc->rc_header.h_length < lock_size)
599 goto Eshort;
600 receive_rcom_lock(ls, rc);
601 break;
602
603 case DLM_RCOM_STATUS_REPLY:
604 receive_sync_reply(ls, rc);
605 break;
606
607 case DLM_RCOM_NAMES_REPLY:
608 receive_sync_reply(ls, rc);
609 break;
610
611 case DLM_RCOM_LOOKUP_REPLY:
612 receive_rcom_lookup_reply(ls, rc);
613 break;
614
615 case DLM_RCOM_LOCK_REPLY:
616 if (rc->rc_header.h_length < lock_size)
617 goto Eshort;
618 dlm_recover_process_copy(ls, rc);
619 break;
620
621 default:
622 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
623 }
624 return;
625
626ignore:
627 log_limit(ls, "dlm_receive_rcom ignore msg %d "
628 "from %d %llu %llu recover seq %llu sts %x gen %u",
629 rc->rc_type,
630 nodeid,
631 (unsigned long long)rc->rc_seq,
632 (unsigned long long)rc->rc_seq_reply,
633 (unsigned long long)seq,
634 status, ls->ls_generation);
635 return;
636Eshort:
637 log_error(ls, "recovery message %d from %d is too short",
638 rc->rc_type, nodeid);
639}
640
641