Skip to content

Commit

Permalink
Merge pull request #298 from rabbitmq/md/keep-while-revidx-prefix-tree
Browse files Browse the repository at this point in the history
Replace keep-while cond reverse index with a prefix tree
  • Loading branch information
dumbbell authored Oct 1, 2024
2 parents b113424 + 1d74ff8 commit caf1ae3
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 39 deletions.
57 changes: 36 additions & 21 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
%% </ul>
%% </td>
%% </tr>
%% <tr>
%% <td style="text-align: right; vertical-align: top;">2</td>
%% <td>
%% <ul>
%% <li>Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%% </li>
%% </ul>
%% </td>
%% </tr>
%% </table>

-module(khepri_machine).
Expand Down Expand Up @@ -101,7 +111,6 @@
get_tree/1,
get_root/1,
get_keep_while_conds/1,
get_keep_while_conds_revidx/1,
get_triggers/1,
get_emitted_triggers/1,
get_projections/1,
Expand Down Expand Up @@ -156,7 +165,7 @@
%% State machine's internal state record.
-record(khepri_machine,
{config = #config{} :: khepri_machine:machine_config(),
tree = #tree{} :: khepri_tree:tree(),
tree = khepri_tree:new() :: khepri_tree:tree(),
triggers = #{} :: khepri_machine:triggers_map(),
emitted_triggers = [] :: [khepri_machine:triggered()],
projections = khepri_pattern_tree:empty() ::
Expand All @@ -168,6 +177,11 @@

-opaque state_v1() :: #khepri_machine{}.
%% State of this Ra state machine, version 1.
%%
%% Note that this type is used also for machine version 2. Machine version 2
%% changes the type of an opaque member of the {@link khepri_tree} record and
%% doesn't need any changes to the `khepri_machine' type. See the moduledoc of
%% this module for more information about version 2.

-type state() :: state_v1() | khepri_machine_v0:state().
%% State of this Ra state machine.
Expand Down Expand Up @@ -1636,17 +1650,18 @@ overview(State) ->
keep_while_conds => KeepWhileConds}.

-spec version() -> MacVer when
MacVer :: 1.
MacVer :: 2.
%% @doc Returns the state machine version.

version() ->
1.
2.

-spec which_module(MacVer) -> Module when
MacVer :: 1 | 0,
MacVer :: 0..2,
Module :: ?MODULE.
%% @doc Returns the state machine module corresponding to the given version.

which_module(2) -> ?MODULE;
which_module(1) -> ?MODULE;
which_module(0) -> ?MODULE.

Expand Down Expand Up @@ -2160,18 +2175,6 @@ get_keep_while_conds(State) ->
#tree{keep_while_conds = KeepWhileConds} = get_tree(State),
KeepWhileConds.

-spec get_keep_while_conds_revidx(State) -> KeepWhileCondsRevIdx when
State :: khepri_machine:state(),
KeepWhileCondsRevIdx :: khepri_tree:keep_while_conds_revidx().
%% @doc Returns the `keep_while' conditions reverse index in the tree from the
%% given state.
%%
%% @private

get_keep_while_conds_revidx(State) ->
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = get_tree(State),
KeepWhileCondsRevIdx.

-spec get_triggers(State) -> Triggers when
State :: khepri_machine:state(),
Triggers :: khepri_machine:triggers_map().
Expand Down Expand Up @@ -2326,25 +2329,37 @@ make_virgin_state(Params) ->
-endif.

-spec convert_state(OldState, OldMacVer, NewMacVer) -> NewState when
OldState :: khepri_machine_v0:state(),
OldState :: khepri_machine:state(),
OldMacVer :: ra_machine:version(),
NewMacVer :: ra_machine:version(),
NewState :: khepri_machine:state().
%% @doc Converts a state to a newer version.
%%
%% @private

convert_state(State, MacVer, MacVer) ->
convert_state(State, OldMacVer, NewMacVer) ->
lists:foldl(
fun(N, State1) ->
OldMacVer1 = N,
NewMacVer1 = erlang:min(N + 1, NewMacVer),
convert_state1(State1, OldMacVer1, NewMacVer1)
end, State, lists:seq(OldMacVer, NewMacVer)).

convert_state1(State, MacVer, MacVer) ->
State;
convert_state(State, 0, 1) ->
convert_state1(State, 0, 1) ->
%% To go from version 0 to version 1, we add the `dedups' fields at the
%% end of the record. The default value is an empty map.
?assert(khepri_machine_v0:is_state(State)),
Fields0 = khepri_machine_v0:state_to_list(State),
Fields1 = Fields0 ++ [#{}],
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.
State1;
convert_state1(State, 1, 2) ->
Tree = get_tree(State),
Tree1 = khepri_tree:convert_tree(Tree, 1, 2),
set_tree(State, Tree1).

-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
Expand Down
2 changes: 1 addition & 1 deletion src/khepri_machine_v0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

-record(khepri_machine,
{config = #config{} :: khepri_machine:machine_config(),
tree = #tree{} :: khepri_tree:tree(),
tree = khepri_tree:new() :: khepri_tree:tree_v0(),
triggers = #{} ::
#{khepri:trigger_id() =>
#{sproc := khepri_path:native_path(),
Expand Down
179 changes: 179 additions & 0 deletions src/khepri_prefix_tree.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright © 2024 Broadcom. All Rights Reserved. The term "Broadcom" refers
%% to Broadcom Inc. and/or its subsidiaries.
%%

-module(khepri_prefix_tree).

%% This module defines a tree that associates paths with optional payloads and
%% specializes in lookup of tree nodes by a prefixing path.
%%
%% This tree is similar to {@link khepri_pattern_tree} but the path components
%% in this tree must be {@link khepri_path:node_id()}s rather than pattern
%% components.
%%
%% This tree is also similar to the main tree type {@link khepri_tree} but it
%% is simpler: it does not support keep-while conditions or properties for
%% tree nodes. This type is used within {@link khepri_tree} for the reverse
%% index of keep-while conditions.
%%
%% See https://en.wikipedia.org/wiki/Trie.

-include_lib("stdlib/include/assert.hrl").

-include("src/khepri_payload.hrl").

-type child_nodes(Payload) :: #{khepri_path:node_id() => tree(Payload)}.

-record(prefix_tree, {payload = ?NO_PAYLOAD,
child_nodes = #{}}).

-opaque tree(Payload) :: #prefix_tree{payload :: Payload | ?NO_PAYLOAD,
child_nodes :: child_nodes(Payload)}.

-export_type([tree/1]).

-export([empty/0,
is_prefix_tree/1,
from_map/1,
fold_prefixes_of/4,
find_path/2,
update/3]).

-spec empty() -> tree(_).
%% @doc Returns a new empty tree.
%%
%% @see tree().

empty() ->
#prefix_tree{}.

-spec is_prefix_tree(tree(_)) -> true;
(term()) -> false.
%% @doc Determines whether the given term is a prefix tree.

is_prefix_tree(#prefix_tree{}) ->
true;
is_prefix_tree(_) ->
false.

-spec from_map(Map) -> Tree when
Map :: #{khepri_path:native_path() => Payload},
Tree :: khepri_prefix_tree:tree(Payload),
Payload :: term().
%% @doc Converts a map of paths to payloads into a prefix tree.

from_map(Map) when is_map(Map) ->
maps:fold(
fun(Path, Payload, Tree) ->
update(
fun(Payload0) ->
%% Map keys are unique so this node in the prefix
%% tree must have no payload.
?assertEqual(?NO_PAYLOAD, Payload0),
Payload
end, Path, Tree)
end, empty(), Map).

-spec fold_prefixes_of(Fun, Acc, Path, Tree) -> Ret when
Fun :: fun((Payload, Acc) -> Acc1),
Acc :: term(),
Acc1 :: term(),
Path :: khepri_path:native_path(),
Tree :: khepri_prefix_tree:tree(Payload),
Payload :: term(),
Ret :: Acc1.
%% @doc Folds over all nodes in the tree which are prefixed by the given `Path'
%% building an accumulated value with the given fold function and initial
%% accumulator.

fold_prefixes_of(Fun, Acc, Path, Tree) when is_function(Fun, 2) ->
fold_prefixes_of1(Fun, Acc, Path, Tree).

fold_prefixes_of1(
Fun, Acc, [], #prefix_tree{payload = Payload, child_nodes = ChildNodes}) ->
Acc1 = case Payload of
?NO_PAYLOAD ->
Acc;
_ ->
Fun(Payload, Acc)
end,
maps:fold(
fun(_Component, Subtree, Acc2) ->
fold_prefixes_of1(Fun, Acc2, [], Subtree)
end, Acc1, ChildNodes);
fold_prefixes_of1(
Fun, Acc, [Component | Rest], #prefix_tree{child_nodes = ChildNodes}) ->
case maps:find(Component, ChildNodes) of
{ok, Subtree} ->
fold_prefixes_of1(Fun, Acc, Rest, Subtree);
error ->
Acc
end.

-spec find_path(Path, Tree) -> Ret when
Path :: khepri_path:native_path(),
Tree :: khepri_prefix_tree:tree(Payload),
Payload :: term(),
Ret :: {ok, Payload} | error.
%% @doc Returns the payload associated with a path in the tree.
%%
%% @returns `{ok, Payload}' where `Payload' is associated with the given path
%% or `error' if the path is not associated with a payload in the given tree.

find_path(Path, Tree) ->
find_path1(Path, Tree).

find_path1([], #prefix_tree{payload = Payload}) ->
case Payload of
?NO_PAYLOAD ->
error;
_ ->
{ok, Payload}
end;
find_path1([Component | Rest], #prefix_tree{child_nodes = ChildNodes}) ->
case maps:find(Component, ChildNodes) of
{ok, Subtree} ->
find_path1(Rest, Subtree);
error ->
error
end.

-spec update(Fun, Path, Tree) -> Ret when
Fun :: fun((Payload | ?NO_PAYLOAD) -> Payload | ?NO_PAYLOAD),
Path :: khepri_path:native_path(),
Tree :: khepri_prefix_tree:tree(Payload),
Payload :: term(),
Ret :: khepri_prefix_tree:tree(Payload).
%% @doc Updates a given path in the tree.
%%
%% This function can be used to create, update or delete tree nodes. If the
%% tree node does not exist for the given path, the update function is passed
%% `?NO_PAYLOAD'. If the update function returns `?NO_PAYLOAD' then the tree
%% node and all of its ancestors which do not have a payload or children are
%% removed.
%%
%% The update function is also be passed `?NO_PAYLOAD' if a tree node exists
%% but does not have a payload: being passed `?NO_PAYLOAD' is not a reliable
%% sign that a tree node did not exist prior to an update.

update(Fun, Path, Tree) ->
update1(Fun, Path, Tree).

update1(Fun, [], #prefix_tree{payload = Payload} = Tree) ->
Tree#prefix_tree{payload = Fun(Payload)};
update1(
Fun, [Component | Rest], #prefix_tree{child_nodes = ChildNodes} = Tree) ->
Subtree = maps:get(Component, ChildNodes, khepri_prefix_tree:empty()),
ChildNodes1 = case update1(Fun, Rest, Subtree) of
#prefix_tree{payload = ?NO_PAYLOAD, child_nodes = C}
when C =:= #{} ->
%% Drop unused branches.
maps:remove(Component, ChildNodes);
Subtree1 ->
maps:put(Component, Subtree1, ChildNodes)
end,
Tree#prefix_tree{child_nodes = ChildNodes1}.
Loading

0 comments on commit caf1ae3

Please sign in to comment.