Skip to content

Commit 53636ed

Browse files
committed
end of day wip
1 parent a905053 commit 53636ed

8 files changed

+293
-192
lines changed

src/ra_log.erl

+24-12
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262

6363
-type ra_meta_key() :: atom().
6464
-type segment_ref() :: {ra_range:range(), File :: file:filename_all()}.
65-
-type event_body() :: {written, ra_term(), ra:range()} |
65+
-type event_body() :: {written, ra_term(), ra_seq:state()} |
6666
{segments, [{ets:tid(), ra:range()}], [segment_ref()]} |
6767
{resend_write, ra_index()} |
6868
{snapshot_written, ra_idxterm(), ra_snapshot:kind()} |
@@ -610,14 +610,14 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
610610

611611
-spec handle_event(event_body(), state()) ->
612612
{state(), [effect()]}.
613-
handle_event({written, _Term, {FromIdx, _ToIdx}},
613+
handle_event({written, _Term, [{FromIdx, _ToIdx}]},
614614
#?MODULE{last_index = LastIdx} = State)
615615
when FromIdx > LastIdx ->
616616
%% we must have reverted back, either by explicit reset or by a snapshot
617617
%% installation taking place whilst the WAL was processing the write
618618
%% Just drop the event in this case as it is stale
619619
{State, []};
620-
handle_event({written, Term, {FromIdx, ToIdx}},
620+
handle_event({written, Term, [{FromIdx, ToIdx}]},
621621
#?MODULE{cfg = Cfg,
622622
last_written_index_term = {LastWrittenIdx0,
623623
_LastWrittenTerm0},
@@ -648,10 +648,10 @@ handle_event({written, Term, {FromIdx, ToIdx}},
648648
{State, []};
649649
NextWrittenRange ->
650650
%% retry with a reduced range
651-
handle_event({written, Term, NextWrittenRange}, State0)
651+
handle_event({written, Term, [NextWrittenRange]}, State0)
652652
end
653653
end;
654-
handle_event({written, _Term, {FromIdx, _}} = Evt,
654+
handle_event({written, _Term, [{FromIdx, _}]} = Evt,
655655
#?MODULE{cfg = #cfg{log_id = LogId},
656656
mem_table = Mt,
657657
last_written_index_term = {LastWrittenIdx, _}} = State0)
@@ -675,6 +675,17 @@ handle_event({written, _Term, {FromIdx, _}} = Evt,
675675
handle_event(Evt,
676676
State#?MODULE{last_written_index_term = {Expected, Term}})
677677
end;
678+
handle_event({written, Term, Written}, State) ->
679+
%% simple handling of ra_seqs for now
680+
case Written of
681+
[I] when is_integer(I) ->
682+
handle_event({written, Term, [{I, I}]}, State);
683+
[I2, I] when is_integer(I) andalso
684+
I + 1 == I2 ->
685+
handle_event({written, Term, [{I, I2}]}, State);
686+
_ ->
687+
exit({sparse_written_events_not_implemented, Written})
688+
end;
678689
handle_event({segments, TidRanges, NewSegs},
679690
#?MODULE{cfg = #cfg{uid = UId, names = Names} = Cfg,
680691
reader = Reader0,
@@ -1200,7 +1211,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
12001211
maybe_append_first_entry(State0 = #?MODULE{last_index = -1}) ->
12011212
State = append({0, 0, undefined}, State0),
12021213
receive
1203-
{ra_log_event, {written, 0, {0, 0}}} ->
1214+
{ra_log_event, {written, 0, [0]}} ->
12041215
ok
12051216
after 60000 ->
12061217
exit({?FUNCTION_NAME, timeout})
@@ -1323,14 +1334,15 @@ pick_range([{Fst, _Lst} | Tail], {CurFst, CurLst}) ->
13231334

13241335
%% TODO: implement synchronous writes using gen_batch_server:call/3
13251336
await_written_idx(Idx, Term, Log0) ->
1326-
IDX = Idx,
13271337
receive
1328-
{ra_log_event, {written, Term, {_, IDX}} = Evt} ->
1338+
{ra_log_event, {written, Term, _Seq} = Evt} ->
13291339
{Log, _} = handle_event(Evt, Log0),
1330-
Log;
1331-
{ra_log_event, {written, _, _} = Evt} ->
1332-
{Log, _} = handle_event(Evt, Log0),
1333-
await_written_idx(Idx, Term, Log)
1340+
case last_written(Log) of
1341+
{Idx, Term} ->
1342+
Log;
1343+
_ ->
1344+
await_written_idx(Idx, Term, Log)
1345+
end
13341346
after ?LOG_APPEND_TIMEOUT ->
13351347
throw(ra_log_append_timeout)
13361348
end.

src/ra_log_segment_writer.erl

+24-23
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ get_overview(#state{data_dir = Dir,
261261
#{data_dir => Dir,
262262
segment_conf => Conf}.
263263

264-
flush_mem_table_ranges({ServerUId, TidRanges0},
264+
flush_mem_table_ranges({ServerUId, TidSeqs0},
265265
#state{system = System} = State) ->
266266
SmallestIdx = smallest_live_idx(ServerUId),
267267
%% TidRanges arrive here sorted new -> old.
@@ -270,31 +270,32 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
270270
%% list of tid ranges to flush to disk
271271
%% now TidRanges are sorted old -> new, i.e the correct order of
272272
%% processing
273-
TidRanges = lists:foldl(
274-
fun ({T, Range0}, []) ->
275-
case ra_range:truncate(SmallestIdx - 1, Range0) of
276-
undefined ->
277-
[];
278-
Range ->
279-
[{T, Range}]
280-
end;
281-
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
282-
Range1 = ra_range:truncate(SmallestIdx - 1, Range0),
283-
case ra_range:limit(Start, Range1) of
284-
undefined ->
285-
Acc;
286-
Range ->
287-
[{T, Range} | Acc]
288-
end
289-
end, [], TidRanges0),
273+
TidSeqs = lists:foldl(
274+
fun ({T, Seq0}, []) ->
275+
case ra_seq:floor(SmallestIdx, Seq0) of
276+
undefined ->
277+
[];
278+
Seq ->
279+
[{T, Seq}]
280+
end;
281+
({T, Seq0}, [{_T, PrevSeq} | _] = Acc) ->
282+
Start = ra_seq:first(PrevSeq),
283+
Seq1 = ra_seq:floor(SmallestIdx, Seq0),
284+
case ra_seq:limit(Start, Seq1) of
285+
undefined ->
286+
Acc;
287+
Seq ->
288+
[{T, Seq} | Acc]
289+
end
290+
end, [], TidSeqs0),
290291

291292
SegRefs0 = lists:append(
292293
lists:reverse(
293294
%% segrefs are returned in appended order so new -> old
294295
%% so we need to reverse them so that the final appended list
295296
%% of segrefs is in the old -> new order
296-
[flush_mem_table_range(ServerUId, TidRange, State)
297-
|| TidRange <- TidRanges])),
297+
[flush_mem_table_range(ServerUId, TidSeq, State)
298+
|| TidSeq <- TidSeqs])),
298299

299300
%% compact cases where a segment was appended in a subsequent call to
300301
%% flush_mem_table_range
@@ -308,14 +309,14 @@ flush_mem_table_ranges({ServerUId, TidRanges0},
308309
[Seg | Acc]
309310
end, [], SegRefs0)),
310311

311-
ok = send_segments(System, ServerUId, TidRanges0, SegRefs),
312+
ok = send_segments(System, ServerUId, TidSeqs0, SegRefs),
312313
ok.
313314

314-
flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
315+
flush_mem_table_range(ServerUId, {Tid, {StartIdx, EndIdx}},
315316
#state{data_dir = DataDir,
316317
segment_conf = SegConf} = State) ->
317318
Dir = filename:join(DataDir, binary_to_list(ServerUId)),
318-
StartIdx = start_index(ServerUId, StartIdx0),
319+
% StartIdx = start_index(ServerUId, StartIdx0),
319320
case open_file(Dir, SegConf) of
320321
enoent ->
321322
?DEBUG("segment_writer: skipping segment as directory ~ts does "

src/ra_log_wal.erl

+47-31
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999

100100
-record(recovery, {mode :: initial | post_boot,
101101
ranges = #{} :: #{ra_uid() =>
102-
[{ets:tid(), {ra:index(), ra:index()}}]},
102+
[{ets:tid(), ra_seq:state()}]},
103103
tables = #{} :: #{ra_uid() => ra_mt:state()},
104104
writers = #{} :: #{ra_uid() => {in_seq, ra:index()}}
105105
}).
@@ -584,22 +584,29 @@ roll_over(#state{wal = Wal0, file_num = Num0,
584584
%% if this is the first wal since restart randomise the first
585585
%% max wal size to reduce the likelihood that each erlang node will
586586
%% flush mem tables at the same time
587-
NextMaxBytes = case Wal0 of
588-
undefined ->
589-
Half = MaxBytes div 2,
590-
Half + rand:uniform(Half);
591-
#wal{ranges = Ranges,
592-
filename = Filename} ->
593-
_ = file:advise(Wal0#wal.fd, 0, 0, dont_need),
594-
ok = close_file(Wal0#wal.fd),
595-
MemTables = Ranges,
596-
%% TODO: only keep base name in state
597-
Basename = filename:basename(Filename),
598-
ok = ra_log_segment_writer:accept_mem_tables(SegWriter,
599-
MemTables,
600-
Basename),
601-
MaxBytes
602-
end,
587+
NextMaxBytes =
588+
case Wal0 of
589+
undefined ->
590+
Half = MaxBytes div 2,
591+
Half + rand:uniform(Half);
592+
#wal{ranges = Ranges,
593+
filename = Filename} ->
594+
_ = file:advise(Wal0#wal.fd, 0, 0, dont_need),
595+
ok = close_file(Wal0#wal.fd),
596+
%% floor all sequences
597+
MemTables = maps:map(
598+
fun (UId, TidRanges) ->
599+
SmallestIdx = smallest_live_index(Conf0, UId),
600+
[{Tid, ra_seq:floor(SmallestIdx, Seq)}
601+
|| {Tid, Seq} <- TidRanges]
602+
end, Ranges),
603+
%% TODO: only keep base name in state
604+
Basename = filename:basename(Filename),
605+
ok = ra_log_segment_writer:accept_mem_tables(SegWriter,
606+
MemTables,
607+
Basename),
608+
MaxBytes
609+
end,
603610
{Conf, Wal} = open_wal(NextFile, NextMaxBytes, Conf0),
604611
State0#state{conf = Conf,
605612
wal = Wal,
@@ -695,11 +702,12 @@ complete_batch(#state{batch = #batch{waiting = Waiting,
695702
complete_batch_writer(Pid, #batch_writer{smallest_live_idx = SmallestIdx,
696703
tid = MtTid,
697704
uid = UId,
698-
seq = Range,
705+
seq = Seq0,
699706
term = Term,
700707
old = undefined}, Ranges) ->
701-
Pid ! {ra_log_event, {written, Term, Range}},
702-
update_ranges(Ranges, UId, MtTid, SmallestIdx, Range);
708+
Seq = ra_seq:floor(SmallestIdx, Seq0),
709+
Pid ! {ra_log_event, {written, Term, Seq}},
710+
update_ranges(Ranges, UId, MtTid, SmallestIdx, Seq);
703711
complete_batch_writer(Pid, #batch_writer{old = #batch_writer{} = OldBw} = Bw,
704712
Ranges0) ->
705713
Ranges = complete_batch_writer(Pid, OldBw, Ranges0),
@@ -968,22 +976,19 @@ should_roll_wal(#state{conf = #conf{max_entries = MaxEntries},
968976
smallest_live_index(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
969977
ra_log_snapshot_state:smallest(Tid, ServerUId).
970978

971-
update_ranges(Ranges, UId, MtTid, SmallestIdx, AddSeq) ->
979+
update_ranges(Ranges, UId, MtTid, _SmallestIdx, AddSeq) ->
972980
case Ranges of
973-
#{UId := [{MtTid, Seq0} | Rem]} ->
981+
#{UId := [{MtTid, Seq0} | Seqs]} ->
974982
%% SmallestIdx might have moved to we truncate the old range first
975983
%% before extending
976-
Seq1 = ra_seq:floor(SmallestIdx, Seq0),
984+
% Seq1 = ra_seq:floor(SmallestIdx, Seq0),
977985
%% limit the old range by the add end start as in some resend
978986
%% cases we may have got back before the prior range.
979-
Seq = ra_seq:add(AddSeq, Seq1),
980-
Ranges#{UId => [{MtTid, Seq} | Rem]};
981-
#{UId := [{OldMtTid, OldMtSeq} | Rem]} ->
987+
Seq = ra_seq:add(AddSeq, Seq0),
988+
Ranges#{UId => [{MtTid, Seq} | Seqs]};
989+
#{UId := Seqs} ->
982990
%% new Tid, need to add a new range record for this
983-
Ranges#{UId => [{MtTid, AddSeq},
984-
{OldMtTid,
985-
ra_seq:floor(SmallestIdx, OldMtSeq)}
986-
| Rem]};
991+
Ranges#{UId => [{MtTid, AddSeq} | Seqs]};
987992
_ ->
988993
Ranges#{UId => [{MtTid, AddSeq}]}
989994
end.
@@ -1043,13 +1048,24 @@ recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx,
10431048
handle_trunc(false, _UId, _Idx, State) ->
10441049
State;
10451050
handle_trunc(true, UId, Idx, #recovery{mode = Mode,
1051+
ranges = Ranges0,
10461052
tables = Tbls} = State) ->
10471053
case Tbls of
10481054
#{UId := Mt0} when Mode == initial ->
10491055
%% only meddle with mem table data in initial mode
10501056
{Specs, Mt} = ra_mt:set_first(Idx-1, Mt0),
10511057
[_ = ra_mt:delete(Spec) || Spec <- Specs],
1052-
State#recovery{tables = Tbls#{UId => Mt}};
1058+
Ranges = case Ranges0 of
1059+
#{UId := Seqs0} ->
1060+
Seqs = [{T, ra_seq:floor(Idx, Seq)}
1061+
|| {T, Seq} <- Seqs0],
1062+
Ranges0#{UId => Seqs};
1063+
_ ->
1064+
Ranges0
1065+
end,
1066+
1067+
State#recovery{tables = Tbls#{UId => Mt},
1068+
ranges = Ranges};
10531069
_ ->
10541070
State
10551071
end.

src/ra_seq.erl

+41-11
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
%% [55, {20, 52}, 3]
1212
-type state() :: [ra:index() | ra:range()].
1313

14-
-export_type([state/0]).
14+
-record(i, {seq :: state()}).
15+
-opaque iter() :: #i{}.
16+
17+
-export_type([state/0,
18+
iter/0]).
1519

1620

1721
-export([
@@ -20,7 +24,12 @@
2024
floor/2,
2125
limit/2,
2226
add/2,
23-
fold/3
27+
fold/3,
28+
expand/1,
29+
subtract/2,
30+
first/1,
31+
iterator/1,
32+
next/1
2433
]).
2534

2635
-spec append(ra:index(), state()) -> state().
@@ -94,6 +103,29 @@ fold(Fun, Acc0, Seq) ->
94103
Fun(Idx, Acc)
95104
end, Acc0, Seq).
96105

106+
-spec expand(state()) -> [ra:index()].
107+
expand(Seq) ->
108+
fold(fun (I, Acc) -> [I | Acc] end, [], Seq).
109+
110+
subtract(SeqA, SeqB) ->
111+
%% TODO: not efficient at all but good enough for now
112+
%% optimise if we end up using this in critical path
113+
A = expand(SeqA),
114+
B = expand(SeqB),
115+
from_list(A -- B).
116+
117+
-spec first(state()) -> undefined | ra:index().
118+
first(Seq) ->
119+
last_index(lists:reverse(Seq)).
120+
121+
-spec iterator(state()) -> iter() | end_of_seq.
122+
iterator(Seq) when is_list(Seq) ->
123+
#i{seq = lists:reverse(Seq)}.
124+
125+
-spec next(iter()) -> {ra:index(), iter() | end_of_seq}.
126+
next(#i{}) ->
127+
end_of_seq.
128+
97129
%% internal functions
98130

99131
floor0(FloorIdx, [Last | Rem], Acc)
@@ -114,13 +146,11 @@ floor0(FloorIdx, [{_, _} = T | Rem], Acc) ->
114146
floor0(_FloorIdx, _Seq, Acc) ->
115147
lists:reverse(Acc).
116148

117-
% first_index(Seq) ->
118-
% last_index(lists:reverse(Seq)).
119149

120-
% last_index([{_, I} | _]) ->
121-
% I;
122-
% last_index([I | _])
123-
% when is_integer(I) ->
124-
% I;
125-
% last_index([]) ->
126-
% undefined.
150+
last_index([{_, I} | _]) ->
151+
I;
152+
last_index([I | _])
153+
when is_integer(I) ->
154+
I;
155+
last_index([]) ->
156+
undefined.

test/ra_log_SUITE.erl

+13-2
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,12 @@ append_then_fetch_no_wait(Config) ->
115115
% results in the last written being updated
116116
receive
117117
{ra_log_event, {written, _, _} = Evt} ->
118+
ct:pal("written ~p", [Evt]),
118119
{Log, _} = ra_log:handle_event(Evt, Log3),
119120
{Idx, Term} = ra_log:last_written(Log)
120-
after 0 ->
121-
ok
121+
after 1000 ->
122+
flush(),
123+
ct:pal("fail written event not received")
122124
end,
123125
ok.
124126

@@ -277,3 +279,12 @@ append_in(Term, Data, Log0) ->
277279
ra_log_take(From, To, Log0) ->
278280
{Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0),
279281
{lists:reverse(Acc), Log}.
282+
283+
flush() ->
284+
receive
285+
Any ->
286+
ct:pal("flush ~p", [Any]),
287+
flush()
288+
after 0 ->
289+
ok
290+
end.

0 commit comments

Comments
 (0)