From 9b91b17d59e9b290175db8ddd60e47604785da1a Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Sat, 13 Feb 2016 03:00:10 -0800 Subject: [PATCH 01/15] Catchup with dt type branch --- include/r18.hrl | 2 + rebar.config | 6 ++- src/dict_compat.erl | 49 ++++++++++++++++++++ src/dict_compat_transform.erl | 86 +++++++++++++++++++++++++++++++++++ src/orset_bm.erl | 6 +-- src/riak_dt_map.erl | 59 ++++++++++++------------ src/riak_dt_orswot.erl | 2 + 7 files changed, 178 insertions(+), 32 deletions(-) create mode 100644 include/r18.hrl create mode 100644 src/dict_compat.erl create mode 100644 src/dict_compat_transform.erl diff --git a/include/r18.hrl b/include/r18.hrl new file mode 100644 index 0000000..a59c504 --- /dev/null +++ b/include/r18.hrl @@ -0,0 +1,2 @@ +-type dict() :: dict:dict(). +-type set() :: sets:set(). \ No newline at end of file diff --git a/rebar.config b/rebar.config index f25336b..3ac4b17 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,9 @@ %% -*- erlang -*- {cover_enabled, true}. -{erl_opts, [debug_info, warnings_as_errors]}. +{erl_opts, [debug_info, warnings_as_errors, {parse_transform, dict_compat_transform}]}. {eunit_opts, [verbose]}. {xref_checks, [undefined_function_calls]}. + +{deps, [ + {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans.git", master}} +]}. \ No newline at end of file diff --git a/src/dict_compat.erl b/src/dict_compat.erl new file mode 100644 index 0000000..e3c87dc --- /dev/null +++ b/src/dict_compat.erl @@ -0,0 +1,49 @@ +%%%------------------------------------------------------------------- +%%% @author sdhillon +%%% @copyright (C) 2016, +%%% @doc +%%% +%%% @end +%%% Created : 09. Feb 2016 12:31 AM +%%%------------------------------------------------------------------- +-module(dict_compat). +-author("sdhillon"). + +-export([update/3, update/4, merge/3]). + +update(Key, Fun, Map) -> + Value = map:get(Key, Map), + Value1 = Fun(Value), + maps:put(Key, Value1, Map). + +update(Key, Fun, Initial, Map) -> + Value = map:get(Key, Map, Initial), + Value1 = Fun(Value), + maps:put(Key, Value1, Map). + +merge(Fun, Map1, Map2) -> + Map1KeysSet = ordsets:from_list(maps:keys(Map1)), + Map2KeysSet = ordsets:from_list(maps:keys(Map2)), + KeysToMerge = ordsets:intersection(Map1KeysSet, Map2KeysSet), + KeysToAddFromMap1 = ordsets:subtract(Map1KeysSet, KeysToMerge), + KeysToAddFromMap2 = ordsets:subtract(Map2KeysSet, KeysToMerge), + Map1Subset = maps:with(KeysToAddFromMap1, Map1), + Map2Subset = maps:with(KeysToAddFromMap2, Map2), + CombinedMap = maps:merge(Map1Subset, Map2Subset), + {_, _, CombinedMap1} = + lists:foldl(merge2(Fun), {Map1, Map2, CombinedMap}, KeysToMerge), + CombinedMap1. + +merge2(Fun) -> + fun(Key, {Map1, Map2, CombinedMap}) -> + Map1Value = maps:get(Key, Map1), + Map2Value = maps:get(Key, Map2), + NewValue = Fun(Key, Map1Value, Map2Value), + CombinedMap1 = maps:put(Key, NewValue, CombinedMap), + {Map1, Map2, CombinedMap1} + end. + + + + + diff --git a/src/dict_compat_transform.erl b/src/dict_compat_transform.erl new file mode 100644 index 0000000..445a1fc --- /dev/null +++ b/src/dict_compat_transform.erl @@ -0,0 +1,86 @@ +-module(dict_compat_transform). + +-export([parse_transform/2]). + +-define(FAKEDICT, dict). +parse_transform(Forms, _Options) -> + case has_maps() of + true -> + parse_trans:plain_transform(fun do_transform/1, Forms); + false -> + Forms + end. + +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, new}}, []}) -> + %[Body] = parse_trans:plain_transform(fun do_transform/1, [Body]), + %{function,L1,store,Arity,Body}; + {call, L, {remote, L, {atom, L, maps}, {atom, L, new}}, []}; + +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, fold}}, [Fun, Init, Val]}) -> + [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), + [NewInit] = parse_trans:plain_transform(fun do_transform/1, [Init]), + [NewVal] = parse_trans:plain_transform(fun do_transform/1, [Val]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, fold}}, [NewFun, NewInit, NewVal]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, find}}, [Key, MaybeMap]}) -> + [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), + [NewMaybeMap] = parse_trans:plain_transform(fun do_transform/1, [MaybeMap]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, find}}, [NewKey, NewMaybeMap]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, store}}, [Key, Value, Datastructure]}) -> + [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), + [NewValue] = parse_trans:plain_transform(fun do_transform/1, [Value]), + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, put}}, [NewKey, NewValue, NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, erase}}, [Key, Datastructure]}) -> + [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, remove}}, [NewKey, NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, update}}, [Key, Fun, Datastructure]}) -> + [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), + [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, update}}, [NewKey, NewFun, NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, update}}, [Key, Fun, Datastructure, Initial]}) -> + [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), + [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + [NewInitial] = parse_trans:plain_transform(fun do_transform/1, [Initial]), + {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, update}}, [NewKey, NewFun, NewDatastructure, NewInitial]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, fetch_keys}}, [Datastructure]}) -> + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, keys}}, [NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, to_list}}, [Datastructure]}) -> + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, to_list}}, [NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, size}}, [Datastructure]}) -> + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, size}}, [NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, from_list}}, [Datastructure]}) -> + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, from_list}}, [NewDatastructure]}; +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, merge}}, [Fun, Datastructure1, Datastructure2]}) -> + [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), + [NewDatastructure1] = parse_trans:plain_transform(fun do_transform/1, [Datastructure1]), + [NewDatastructure2] = parse_trans:plain_transform(fun do_transform/1, [Datastructure2]), + {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, merge}}, [NewFun, NewDatastructure1, NewDatastructure2]}; + +do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, map}}, [Fun, Datastructure]}) -> + [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), + [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), + {call, L, {remote, L, {atom, L, maps}, {atom, L, map}}, [NewFun, NewDatastructure]}; +do_transform(_Form) -> + %io:format("Visited Form: ~p~n", [Form]), + continue. + +% For debugging +%print_transform(Form) -> +% io:format("Visited Form: ~w~n", [Form]), +% continue. + +has_maps() -> + try maps:new() of + _ -> + true + catch + _ -> + false + end. \ No newline at end of file diff --git a/src/orset_bm.erl b/src/orset_bm.erl index 6ff2964..b7f1121 100644 --- a/src/orset_bm.erl +++ b/src/orset_bm.erl @@ -19,15 +19,15 @@ -define(N, 10). start(Cmds, Mod) -> - TS = erlang:now(), + TS = erlang:monotonic_time(), Coord = start_coordinator(?N, Mod, self()), lists:foreach(fun(I) -> start_proc(I, Cmds, Mod, Coord) end, lists:seq(1, ?N)), receive {Coord, Results} -> - TE = erlang:now(), + TE = erlang:monotonic_time(), Size = m_size(Mod, Results), - io:format("Results are ~p in ~p~n", [Size, timer:now_diff(TE, TS) / 1000]) + io:format("Results are ~p in ~p~n", [Size, erlang:convert_time_unit(TE - TS, native, micro_seconds)]) end. diff --git a/src/riak_dt_map.erl b/src/riak_dt_map.erl index 054f7de..93cc2f9 100644 --- a/src/riak_dt_map.erl +++ b/src/riak_dt_map.erl @@ -167,6 +167,9 @@ -behaviour(riak_dt). +-include("r18.hrl"). + + -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). -endif. @@ -186,10 +189,10 @@ -export([gen_op/0, gen_op/1, gen_field/0, gen_field/1, generate/0, size/1]). -endif. --export_type([map/0, binary_map/0, map_op/0]). +-export_type([dt_map/0, binary_map/0, map_op/0]). -type binary_map() :: binary(). %% A binary that from_binary/1 will accept --type map() :: {riak_dt_vclock:vclock(), entries(), deferred()}. +-type dt_map() :: {riak_dt_vclock:vclock(), entries(), deferred()}. -type entries() :: [field()]. -type field() :: {field_name(), field_value()}. -type field_name() :: {Name :: binary(), CRDTModule :: crdt_mod()}. @@ -215,7 +218,7 @@ -type crdt() :: riak_dt_emcntr:emcntr() | riak_dt_od_flag:od_flag() | riak_dt_lwwreg:lwwreg() | riak_dt_orswot:orswot() | - riak_dt_map:map(). + riak_dt_map:dt_map(). -type map_op() :: {update, [map_field_update() | map_field_op()]}. @@ -234,18 +237,18 @@ -type precondition_error() :: {error, {precondition, {not_present, field()}}}. %% @doc Create a new, empty Map. --spec new() -> map(). +-spec new() -> dt_map(). new() -> {riak_dt_vclock:fresh(), orddict:new(), orddict:new()}. %% @doc sets the clock in the map to that `Clock'. Used by a %% containing Map for sub-CRDTs --spec parent_clock(riak_dt_vclock:vclock(), map()) -> map(). +-spec parent_clock(riak_dt_vclock:vclock(), dt_map()) -> dt_map(). parent_clock(Clock, {_MapClock, Values, Deferred}) -> {Clock, Values, Deferred}. %% @doc get the current set of values for this Map --spec value(map()) -> values(). +-spec value(dt_map()) -> values(). value({_Clock, Values, _Deferred}) -> orddict:fold(fun({Name, Type}, CRDTs, Acc) -> Merged = merge_crdts(Type, CRDTs), @@ -271,11 +274,11 @@ merge_crdts(Type, {CRDTs, TS}) -> Type:merge(TS, V). %% @doc query map (not implemented yet) --spec value(term(), map()) -> values(). +-spec value(term(), dt_map()) -> values(). value(_, Map) -> value(Map). -%% @doc update the `map()' or a field in the `map()' by executing +%% @doc update the `dt_map()' or a field in the `dt_map()' by executing %% the `map_op()'. `Ops' is a list of one or more of the following %% ops: %% @@ -291,8 +294,8 @@ value(_, Map) -> %% still present, and it's value will contain the concurrent update. %% %% Atomic, all of `Ops' are performed successfully, or none are. --spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map()) -> - {ok, map()} | precondition_error(). +-spec update(map_op(), riak_dt:actor() | riak_dt:dot(), dt_map()) -> + {ok, dt_map()} | precondition_error(). update(Op, ActorOrDot, Map) -> update(Op, ActorOrDot, Map, undefined). @@ -302,8 +305,8 @@ update(Op, ActorOrDot, Map) -> %% types. hence the common clock. %% %% @see parent_clock/2 --spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map(), riak_dt:context()) -> - {ok, map()}. +-spec update(map_op(), riak_dt:actor() | riak_dt:dot(), dt_map(), riak_dt:context()) -> + {ok, dt_map()}. update({update, Ops}, ActorOrDot, {Clock0, Values, Deferred}, Ctx) -> {Dot, Clock} = update_clock(ActorOrDot, Clock0), apply_ops(Ops, Dot, {Clock, Values, Deferred}, Ctx). @@ -324,7 +327,7 @@ update_clock(Actor, Clock) -> %% @private -spec apply_ops([map_field_update() | map_field_op()], riak_dt:dot(), {riak_dt_vclock:vclock(), entries() , deferred()}, context()) -> - {ok, map()} | precondition_error(). + {ok, dt_map()} | precondition_error(). apply_ops([], _Dot, Map, _Ctx) -> {ok, Map}; apply_ops([{update, {_Name, Type}=Field, Op} | Rest], Dot, {Clock, Values, Deferred}, Ctx) -> @@ -363,8 +366,8 @@ apply_ops([{remove, Field} | Rest], Dot, Map, Ctx) -> %% %% @see defer_remove/4 for handling of removes of fields that are %% _not_ present --spec remove_field(field(), map(), context()) -> - {ok, map()} | precondition_error(). +-spec remove_field(field(), dt_map(), context()) -> + {ok, dt_map()} | precondition_error(). remove_field(Field, {Clock, Values, Deferred}, undefined) -> case orddict:find(Field, Values) of error -> @@ -437,8 +440,8 @@ defer_remove(Clock, Ctx, Field, Deferred) -> Deferred) end. -%% @doc merge two `map()'s. --spec merge(map(), map()) -> map(). +%% @doc merge two `dt_map()'s. +-spec merge(dt_map(), dt_map()) -> dt_map(). merge(Map, Map) -> Map; %% @TODO is there a way to optimise this, based on clocks maybe? @@ -569,8 +572,8 @@ apply_deferred(Clock, Entries, Deferred) -> Deferred). %% @private --spec remove_all([field()], map(), context()) -> - map(). +-spec remove_all([field()], dt_map(), context()) -> + dt_map(). remove_all(Fields, Map, Ctx) -> lists:foldl(fun(Field, MapAcc) -> {ok, MapAcc2}= remove_field(Field, MapAcc, Ctx), @@ -579,10 +582,10 @@ remove_all(Fields, Map, Ctx) -> Map, Fields). -%% @doc compare two `map()'s for equality of structure Both schemas +%% @doc compare two `dt_map()'s for equality of structure Both schemas %% and value list must be equal. Performs a pariwise equals for all %% values in the value lists --spec equal(map(), map()) -> boolean(). +-spec equal(dt_map(), dt_map()) -> boolean(). equal({Clock1, Values1, Deferred1}, {Clock2, Values2, Deferred2}) -> riak_dt_vclock:equal(Clock1, Clock2) andalso Deferred1 == Deferred2 andalso @@ -613,7 +616,7 @@ pairwise_equals(_, _) -> %% that only seen fields are removed. If a field removal operation has %% a context that the Map has not seen, it will be deferred until %% causally relevant. --spec precondition_context(map()) -> riak_dt:context(). +-spec precondition_context(dt_map()) -> riak_dt:context(). precondition_context({Clock, _Field, _Deferred}) -> Clock. @@ -625,11 +628,11 @@ precondition_context({Clock, _Field, _Deferred}) -> %% basically `field_count' - ( unique fields) %% `deferred_length': How many operations on the deferred list, a reasonable expression %% of lag/staleness. --spec stats(map()) -> [{atom(), integer()}]. +-spec stats(dt_map()) -> [{atom(), integer()}]. stats(Map) -> [ {S, stat(S, Map)} || S <- [actor_count, field_count, duplication, deferred_length]]. --spec stat(atom(), map()) -> number() | undefined. +-spec stat(atom(), dt_map()) -> number() | undefined. stat(actor_count, {Clock, _, _}) -> length(Clock); stat(field_count, {_, Fields, _}) -> @@ -650,7 +653,7 @@ stat(_,_) -> undefined. -define(TAG, ?DT_MAP_TAG). -define(V1_VERS, 1). -%% @doc returns a binary representation of the provided `map()'. The +%% @doc returns a binary representation of the provided `dt_map()'. The %% resulting binary is tagged and versioned for ease of future %% upgrade. Calling `from_binary/1' with the result of this function %% will return the original map. Use the application env var @@ -658,15 +661,15 @@ stat(_,_) -> undefined. %% (`false') %% %% @see `from_binary/1' --spec to_binary(map()) -> binary_map(). +-spec to_binary(dt_map()) -> binary_map(). to_binary(Map) -> <>. %% @doc When the argument is a `binary_map()' produced by -%% `to_binary/1' will return the original `map()'. +%% `to_binary/1' will return the original `dt_map()'. %% %% @see `to_binary/1' --spec from_binary(binary_map()) -> map(). +-spec from_binary(binary_map()) -> dt_map(). from_binary(<>) -> riak_dt:from_binary(B). diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index c57c44d..6cc83a7 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -62,6 +62,8 @@ -behaviour(riak_dt). +-include("r18.hrl"). + -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). -define(QC_OUT(P), From 98133f965b27085bb8057bce56c18a10da40af8e Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 30 May 2016 03:55:06 -0700 Subject: [PATCH 02/15] add faster riak_dt_vclock impl with benchmarks --- rebar.config | 5 +- src/dict_compat.erl | 49 ------- src/dict_compat_transform.erl | 86 ------------- src/riak_dt_vclock.erl | 233 ++++++++++++++++++++++------------ 4 files changed, 156 insertions(+), 217 deletions(-) delete mode 100644 src/dict_compat.erl delete mode 100644 src/dict_compat_transform.erl diff --git a/rebar.config b/rebar.config index 3ac4b17..2ec3da2 100644 --- a/rebar.config +++ b/rebar.config @@ -1,9 +1,6 @@ %% -*- erlang -*- {cover_enabled, true}. -{erl_opts, [debug_info, warnings_as_errors, {parse_transform, dict_compat_transform}]}. +{erl_opts, [debug_info, warnings_as_errors]}. {eunit_opts, [verbose]}. {xref_checks, [undefined_function_calls]}. -{deps, [ - {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans.git", master}} -]}. \ No newline at end of file diff --git a/src/dict_compat.erl b/src/dict_compat.erl deleted file mode 100644 index e3c87dc..0000000 --- a/src/dict_compat.erl +++ /dev/null @@ -1,49 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author sdhillon -%%% @copyright (C) 2016, -%%% @doc -%%% -%%% @end -%%% Created : 09. Feb 2016 12:31 AM -%%%------------------------------------------------------------------- --module(dict_compat). --author("sdhillon"). - --export([update/3, update/4, merge/3]). - -update(Key, Fun, Map) -> - Value = map:get(Key, Map), - Value1 = Fun(Value), - maps:put(Key, Value1, Map). - -update(Key, Fun, Initial, Map) -> - Value = map:get(Key, Map, Initial), - Value1 = Fun(Value), - maps:put(Key, Value1, Map). - -merge(Fun, Map1, Map2) -> - Map1KeysSet = ordsets:from_list(maps:keys(Map1)), - Map2KeysSet = ordsets:from_list(maps:keys(Map2)), - KeysToMerge = ordsets:intersection(Map1KeysSet, Map2KeysSet), - KeysToAddFromMap1 = ordsets:subtract(Map1KeysSet, KeysToMerge), - KeysToAddFromMap2 = ordsets:subtract(Map2KeysSet, KeysToMerge), - Map1Subset = maps:with(KeysToAddFromMap1, Map1), - Map2Subset = maps:with(KeysToAddFromMap2, Map2), - CombinedMap = maps:merge(Map1Subset, Map2Subset), - {_, _, CombinedMap1} = - lists:foldl(merge2(Fun), {Map1, Map2, CombinedMap}, KeysToMerge), - CombinedMap1. - -merge2(Fun) -> - fun(Key, {Map1, Map2, CombinedMap}) -> - Map1Value = maps:get(Key, Map1), - Map2Value = maps:get(Key, Map2), - NewValue = Fun(Key, Map1Value, Map2Value), - CombinedMap1 = maps:put(Key, NewValue, CombinedMap), - {Map1, Map2, CombinedMap1} - end. - - - - - diff --git a/src/dict_compat_transform.erl b/src/dict_compat_transform.erl deleted file mode 100644 index 445a1fc..0000000 --- a/src/dict_compat_transform.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(dict_compat_transform). - --export([parse_transform/2]). - --define(FAKEDICT, dict). -parse_transform(Forms, _Options) -> - case has_maps() of - true -> - parse_trans:plain_transform(fun do_transform/1, Forms); - false -> - Forms - end. - -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, new}}, []}) -> - %[Body] = parse_trans:plain_transform(fun do_transform/1, [Body]), - %{function,L1,store,Arity,Body}; - {call, L, {remote, L, {atom, L, maps}, {atom, L, new}}, []}; - -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, fold}}, [Fun, Init, Val]}) -> - [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), - [NewInit] = parse_trans:plain_transform(fun do_transform/1, [Init]), - [NewVal] = parse_trans:plain_transform(fun do_transform/1, [Val]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, fold}}, [NewFun, NewInit, NewVal]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, find}}, [Key, MaybeMap]}) -> - [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), - [NewMaybeMap] = parse_trans:plain_transform(fun do_transform/1, [MaybeMap]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, find}}, [NewKey, NewMaybeMap]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, store}}, [Key, Value, Datastructure]}) -> - [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), - [NewValue] = parse_trans:plain_transform(fun do_transform/1, [Value]), - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, put}}, [NewKey, NewValue, NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, erase}}, [Key, Datastructure]}) -> - [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, remove}}, [NewKey, NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, update}}, [Key, Fun, Datastructure]}) -> - [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), - [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, update}}, [NewKey, NewFun, NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, update}}, [Key, Fun, Datastructure, Initial]}) -> - [NewKey] = parse_trans:plain_transform(fun do_transform/1, [Key]), - [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - [NewInitial] = parse_trans:plain_transform(fun do_transform/1, [Initial]), - {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, update}}, [NewKey, NewFun, NewDatastructure, NewInitial]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, fetch_keys}}, [Datastructure]}) -> - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, keys}}, [NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, to_list}}, [Datastructure]}) -> - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, to_list}}, [NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, size}}, [Datastructure]}) -> - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, size}}, [NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, from_list}}, [Datastructure]}) -> - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, from_list}}, [NewDatastructure]}; -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, merge}}, [Fun, Datastructure1, Datastructure2]}) -> - [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), - [NewDatastructure1] = parse_trans:plain_transform(fun do_transform/1, [Datastructure1]), - [NewDatastructure2] = parse_trans:plain_transform(fun do_transform/1, [Datastructure2]), - {call, L, {remote, L, {atom, L, dict_compat}, {atom, L, merge}}, [NewFun, NewDatastructure1, NewDatastructure2]}; - -do_transform({call, L, {remote, L, {atom, L, ?FAKEDICT}, {atom, L, map}}, [Fun, Datastructure]}) -> - [NewFun] = parse_trans:plain_transform(fun do_transform/1, [Fun]), - [NewDatastructure] = parse_trans:plain_transform(fun do_transform/1, [Datastructure]), - {call, L, {remote, L, {atom, L, maps}, {atom, L, map}}, [NewFun, NewDatastructure]}; -do_transform(_Form) -> - %io:format("Visited Form: ~p~n", [Form]), - continue. - -% For debugging -%print_transform(Form) -> -% io:format("Visited Form: ~w~n", [Form]), -% continue. - -has_maps() -> - try maps:new() of - _ -> - true - catch - _ -> - false - end. \ No newline at end of file diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index 76b8984..ab5fbd5 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -30,6 +30,7 @@ %% pp. 215-226 -module(riak_dt_vclock). +-compile(inline_list_funcs). -export([fresh/0,descends/2,merge/1,get_counter/2, subtract_dots/2, increment/2,all_nodes/1, equal/2, @@ -42,36 +43,58 @@ -export_type([vclock/0, vclock_node/0, binary_vclock/0]). -type vclock() :: [vc_entry()]. +-type sorted_vclock() :: orddict:orddict(vclock_node(), counter()). -type binary_vclock() :: binary(). % The timestamp is present but not used, in case a client wishes to inspect it. -type vc_entry() :: {vclock_node(), counter()}. % Nodes can have any term() as a name, but they must differ from each other. -type vclock_node() :: term(). --type counter() :: integer(). +-type counter() :: non_neg_integer(). % @doc Create a brand new vclock. -spec fresh() -> vclock(). fresh() -> - []. + orddict:new(). % @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! -spec descends(Va :: vclock()|[], Vb :: vclock()|[]) -> boolean(). -descends(_, []) -> - % all vclocks descend from the empty vclock - true; -descends(Va, Vb) -> - [{NodeB, CtrB} |RestB] = Vb, - case lists:keyfind(NodeB, 1, Va) of - false -> - false; - {_, CtrA} -> - (CtrA >= CtrB) andalso descends(Va,RestB) - end. +descends(Va0, Vb0) -> + Va1 = lists:usort(Va0), + Vb1 = lists:usort(Vb0), + descends1(Va1, Vb1). + +-spec descends1(Va :: sorted_vclock()|[], Vb :: sorted_vclock()|[]) -> boolean(). +descends1(_, []) -> + true; +descends1([{Actor, CounterA}|RestA], [{Actor, CounterB}|RestB]) when CounterA >= CounterB -> + descends1(RestA, RestB); +descends1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB]) when ActorB >= ActorA -> + descends1(RestA, Vb); +descends1(_, _) -> + false. + +% @doc Return true if Va strictly dominates Vb, else false! -spec dominates(vclock(), vclock()) -> boolean(). -dominates(A, B) -> - descends(A, B) andalso not descends(B, A). +dominates(Va0, Vb0) -> + Va1 = lists:usort(Va0), + Vb1 = lists:usort(Vb0), + dominates1(Va1, Vb1, false). + +-spec dominates1(sorted_vclock(), sorted_vclock(), boolean()) -> boolean(). +dominates1(_, [], true) -> true; +dominates1([], [], HasDominated) -> HasDominated; +dominates1([{Actor, CounterA}|RestA], [{Actor, CounterB}|RestB], _HasDominated) when CounterA > CounterB -> + dominates1(RestA, RestB, true); +dominates1([{Actor, Counter}|RestA], [{Actor, Counter}|RestB], HasDominated) -> + dominates1(RestA, RestB, HasDominated); +dominates1([{Actor, CounterA}|_RestA], [{Actor, CounterB}|_RestB], _HasDominated) when CounterB > CounterA -> + false; +dominates1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB], HasDominated) when ActorB >= ActorA -> + dominates1(RestA, Vb, HasDominated); +dominates1(Va = [{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|RestB], HasDominated) when ActorA >= ActorB -> + dominates1(Va, RestB, HasDominated). %% @doc subtract the VClock from the DotList. %% what this means is that any {actor(), count()} pair in @@ -80,51 +103,55 @@ dominates(A, B) -> %% [{a, 4}, {b, 1}, {c, 1}, {d, 14}, {e, 5}, {f, 2}] = %% [{{b, 2}, {g, 22}] -spec subtract_dots(vclock(), vclock()) -> vclock(). -subtract_dots(DotList, VClock) -> - drop_dots(DotList, VClock, []). +subtract_dots(DotList0, VClock0) -> + DotList1 = lists:usort(DotList0), + VClock1 = lists:usort(VClock0), + drop_dots(DotList1, VClock1, []). +%% A - B drop_dots([], _Clock, NewDots) -> - lists:sort(NewDots); -drop_dots([{Actor, Count}=Dot | Rest], Clock, Acc) -> - case get_counter(Actor, Clock) of - Cnt when Cnt >= Count -> - %% Dot is dominated by clock, drop it - drop_dots(Rest, Clock, Acc); - _ -> - drop_dots(Rest, Clock, [Dot | Acc]) - end. + lists:reverse(NewDots); +drop_dots(A, [], NewDots) -> + lists:reverse(A) ++ lists:reverse(NewDots); +drop_dots([Dot = {Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA > CountB -> + drop_dots(RestA, RestB, [Dot|Acc]); +drop_dots([{Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA =< CountB -> + drop_dots(RestA, RestB, Acc); +drop_dots([Dot = {ActorA, _CountA} | RestA], B = [{ActorB, _CountB} | _RestB], Acc) when ActorB > ActorA -> + drop_dots(RestA, B, [Dot|Acc]); +drop_dots(A = [{ActorA, _CountA} | _RestA], [{ActorB, _CountB} | RestB], Acc) when ActorA > ActorB -> + drop_dots(A, RestB, Acc). + + % @doc Combine all VClocks in the input list into their least possible % common descendant. -spec merge(VClocks :: [vclock()]) -> vclock() | []. -merge([]) -> []; -merge([SingleVclock]) -> SingleVclock; -merge([First|Rest]) -> merge(Rest, lists:keysort(1, First)). - -merge([], NClock) -> NClock; -merge([AClock|VClocks],NClock) -> - merge(VClocks, merge(lists:keysort(1, AClock), NClock, [])). - -merge([], [], AccClock) -> lists:reverse(AccClock); -merge([], Left, AccClock) -> lists:reverse(AccClock, Left); -merge(Left, [], AccClock) -> lists:reverse(AccClock, Left); -merge(V=[{Node1, Ctr1}=NCT1|VClock], - N=[{Node2,Ctr2}=NCT2|NClock], AccClock) -> - if Node1 < Node2 -> - merge(VClock, N, [NCT1|AccClock]); - Node1 > Node2 -> - merge(V, NClock, [NCT2|AccClock]); - true -> - CT = if Ctr1 > Ctr2 -> Ctr1; - Ctr1 < Ctr2 -> Ctr2; - true -> Ctr1 - end, - merge(VClock, NClock, [{Node1,CT}|AccClock]) - end. +merge(VClocks0) -> + VClocks1 = lists:merge(VClocks0), + VClocks2 = lists:usort(VClocks1), + VClocks3 = + lists:foldl( + fun + ({Actor, ClockA}, [{Actor, ClockB}|Acc]) when ClockB >= ClockA -> + Acc; + (Dot = {Actor, ClockA}, [{Actor, ClockB}|Acc]) when ClockA > ClockB -> + [Dot|Acc]; + (Dot, Acc) -> + [Dot|Acc] + end, + fresh(), + VClocks2 + ), + lists:reverse(VClocks3). + % @doc Get the counter value in VClock set from Node. -spec get_counter(Node :: vclock_node(), VClock :: vclock()) -> counter(). get_counter(Node, VClock) -> + %% No reason to try sorting it + %% Best case scenario sort is O(N) + %% This function's worst case scenario is O(N) case lists:keyfind(Node, 1, VClock) of {_, Ctr} -> Ctr; false -> 0 @@ -133,30 +160,25 @@ get_counter(Node, VClock) -> % @doc Increment VClock at Node. -spec increment(Node :: vclock_node(), VClock :: vclock()) -> vclock(). -increment(Node, VClock) -> - {Ctr,NewV} = case lists:keytake(Node, 1, VClock) of - false -> - {1, VClock}; - {value, {_N, C}, ModV} -> - {C + 1, ModV} - end, - [{Node,Ctr}|NewV]. - +increment(Node, VClock0) -> + VClock1 = lists:usort(VClock0), + orddict:update(Node, fun(X) -> X + 1 end, 1, VClock1). % @doc Return the list of all nodes that have ever incremented VClock. -spec all_nodes(VClock :: vclock()) -> [vclock_node()]. -all_nodes(VClock) -> - [X || {X, _} <- sort(VClock)]. +all_nodes(VClock0) -> + VClock1 = lists:usort(VClock0), + lists:usort([X || {X, _} <- VClock1]). % @doc Compares two VClocks for equality. -spec equal(VClockA :: vclock(), VClockB :: vclock()) -> boolean(). equal(VA,VB) -> - lists:sort(VA) =:= lists:sort(VB). + lists:usort(VA) =:= lists:usort(VB). %% @doc sorts the vclock by actor -spec sort(vclock()) -> vclock(). sort(Clock) -> - lists:sort(Clock). + lists:usort(Clock). %% @doc an effecient format for disk / wire. %5 @see `from_binary/1` @@ -171,21 +193,28 @@ from_binary(Bin) -> %% @doc take two vclocks and return a vclock that summerizes only the %% events both have seen. --spec glb(vclock(), vclock()) -> vclock(). -glb(Clock1, Clock2) -> - Clock = lists:foldl(fun({Actor, Cnt}, GLB) -> - case lists:keyfind(Actor, 1, Clock2) of - false -> - GLB; - {Actor, Cnt2} when Cnt2 >= Cnt -> - [{Actor, Cnt} | GLB]; - {Actor, Cnt2} -> - [{Actor, Cnt2} | GLB] - end - end, - fresh(), - Clock1), - lists:sort(Clock). +-spec glb(vclock(), vclock()) -> sorted_vclock(). +glb(ClockA0, ClockB0) -> + ClockA1 = lists:usort(ClockA0), + ClockB1 = lists:usort(ClockB0), + glb(ClockA1, ClockB1, []). + +glb([], [], Acc) -> + lists:reverse(Acc); +glb(_, [], Acc) -> + glb([], [], Acc); +glb([], _, Acc) -> + glb([], [], Acc); +glb([Dot|RestClockA], [Dot|RestClockB], Acc) -> + glb(RestClockA, RestClockB, Acc); +glb([{Actor, CounterA}|RestClockA], [DotB = {Actor, CounterB}|RestClockB], Acc) when CounterA > CounterB -> + glb(RestClockA, RestClockB, [DotB|Acc]); +glb([DotA = {Actor, CounterA}|RestClockA], [{Actor, CounterB}|RestClockB], Acc) when CounterB > CounterA -> + glb(RestClockA, RestClockB, [DotA|Acc]); +glb([{ActorA, _}|RestClockA], ClockB = [{ActorB, _}|_RestClockB], Acc) when ActorA < ActorB -> + glb(RestClockA, ClockB, Acc); +glb(ClockA = [{ActorA, _}|_RestClockA], [{ActorB, _}|RestClockB], Acc) when ActorA > ActorB -> + glb(ClockA, RestClockB, Acc). %% =================================================================== %% EUnit tests @@ -246,4 +275,52 @@ merge_same_id_test() -> ?assertEqual([{<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], riak_dt_vclock:merge([VC1, VC2])). +random_clock1(N1, N2) -> + Seq0 = lists:seq(N1, N2), + Seq1 = [{rand:uniform(1000000), X} || X <- Seq0], + Seq2 = lists:sort(Seq1), + Seq3 = [X || {_, X} <- Seq2], + lists:map( + fun(I) -> + Actor = list_to_atom(lists:flatten(io_lib:format("actor-~b", [I]))), + {Actor, random:uniform(100000)} + end, + Seq3 + ). +get_time(Fun, Args) -> + T1 = os:timestamp(), + lists:foreach(fun(_) -> apply(Fun, Args) end, lists:seq(1, 10000)), + T2 = os:timestamp(), + DiffMicros = timer:now_diff(T2, T1), + round(DiffMicros / 10000.0). + +bench_test_() -> + {timeout, 300, [fun() -> bench() end]}. +bench() -> + A = random_clock1(1, 1000), + A1 = lists:usort(A), + B = random_clock1(1, 1000), + B1 = lists:usort(B), + C = random_clock1(500, 1500), + C1 = lists:usort(C), + D = random_clock1(1, 1500), + D1 = lists:usort(D), + + ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), + ?debugFmt("Increment Time (2): ~b~n", [get_time(fun increment/2, ['actor-500', A1])]), + ?debugFmt("Merge Time (1): ~b~n", [get_time(fun merge/1, [[B, A]])]), + ?debugFmt("Merge Time (2): ~b~n", [get_time(fun merge/1, [[B1, A1]])]), + ?debugFmt("Merge Time (3): ~b~n", [get_time(fun merge/1, [[A, C]])]), + ?debugFmt("Merge Time (4): ~b~n", [get_time(fun merge/1, [[A1, C1]])]), + ?debugFmt("Merge Time (5): ~b~n", [get_time(fun merge/1, [[A, A]])]), + ?debugFmt("Merge Time (6): ~b~n", [get_time(fun merge/1, [[A1, A1]])]), + ?debugFmt("Descends Time (1): ~b~n", [get_time(fun descends/2, [A, C])]), + ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A1, C1])]), + ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A, D])]), + ?debugFmt("Descends Time (3): ~b~n", [get_time(fun descends/2, [A1, D1])]), + ?debugFmt("Descends Time (4): ~b~n", [get_time(fun descends/2, [D, A])]), + ?debugFmt("Descends Time (5): ~b~n", [get_time(fun descends/2, [D1, A1])]), + ?debugFmt("Descends Time (6): ~b~n", [get_time(fun descends/2, [D, D])]), + ?debugFmt("Descends Time (7): ~b~n", [get_time(fun descends/2, [D1, D1])]). + -endif. From 27b261f6c1f5c519bbcf0bd3f469c4dfe4e0e604 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 30 May 2016 07:05:30 -0700 Subject: [PATCH 03/15] Add special dot to indicate vclock features --- src/riak_dt_vclock.erl | 112 +++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index ab5fbd5..72d0fe3 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -40,6 +40,9 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +%% The presence of the special actor indicates the list is sorted +%% This is a very "low" actor # that was chosen randomly, and smaller than -2**31 +-define(SPECIAL_DOT, {-34359738368, 1}). -export_type([vclock/0, vclock_node/0, binary_vclock/0]). -type vclock() :: [vc_entry()]. @@ -53,15 +56,36 @@ -type counter() :: non_neg_integer(). % @doc Create a brand new vclock. --spec fresh() -> vclock(). +-spec fresh() -> sorted_vclock(). fresh() -> - orddict:new(). + []. + +-spec(ensure_sorted(vclock()) -> sorted_vclock()). +ensure_sorted(VClock0) -> + case lists:member(?SPECIAL_DOT, VClock0) of + true -> + VClock0; + false -> + lists:usort([?SPECIAL_DOT|VClock0]) + end. +%% {SpecialActor, _} = ?SPECIAL_DOT, +%% case lists:keyfind(SpecialActor, 1, VClock0) of +%% {_, SpecialVersionCounter} when SpecialVersionCounter band 1 > 0 -> +%% VClock0; +%% Else -> +%% ?debugFmt("Found: ~p~n", [Else]), +%% After = lists:usort([?SPECIAL_DOT|VClock0]), +%% ?debugFmt("After: ~p~n", [After]), +%% After +%% +%% end. + % @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! -spec descends(Va :: vclock()|[], Vb :: vclock()|[]) -> boolean(). descends(Va0, Vb0) -> - Va1 = lists:usort(Va0), - Vb1 = lists:usort(Vb0), + Va1 = ensure_sorted(Va0), + Vb1 = ensure_sorted(Vb0), descends1(Va1, Vb1). @@ -78,8 +102,8 @@ descends1(_, _) -> % @doc Return true if Va strictly dominates Vb, else false! -spec dominates(vclock(), vclock()) -> boolean(). dominates(Va0, Vb0) -> - Va1 = lists:usort(Va0), - Vb1 = lists:usort(Vb0), + Va1 = ensure_sorted(Va0), + Vb1 = ensure_sorted(Vb0), dominates1(Va1, Vb1, false). -spec dominates1(sorted_vclock(), sorted_vclock(), boolean()) -> boolean(). @@ -104,8 +128,8 @@ dominates1(Va = [{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|RestB], HasDo %% [{{b, 2}, {g, 22}] -spec subtract_dots(vclock(), vclock()) -> vclock(). subtract_dots(DotList0, VClock0) -> - DotList1 = lists:usort(DotList0), - VClock1 = lists:usort(VClock0), + DotList1 = ensure_sorted(DotList0), + VClock1 = ensure_sorted(VClock0), drop_dots(DotList1, VClock1, []). %% A - B @@ -128,22 +152,28 @@ drop_dots(A = [{ActorA, _CountA} | _RestA], [{ActorB, _CountB} | RestB], Acc) wh % common descendant. -spec merge(VClocks :: [vclock()]) -> vclock() | []. merge(VClocks0) -> - VClocks1 = lists:merge(VClocks0), - VClocks2 = lists:usort(VClocks1), - VClocks3 = - lists:foldl( - fun - ({Actor, ClockA}, [{Actor, ClockB}|Acc]) when ClockB >= ClockA -> - Acc; - (Dot = {Actor, ClockA}, [{Actor, ClockB}|Acc]) when ClockA > ClockB -> - [Dot|Acc]; - (Dot, Acc) -> - [Dot|Acc] - end, - fresh(), - VClocks2 - ), - lists:reverse(VClocks3). + [VClocks1|RestVClocks1] = lists:map(fun ensure_sorted/1, VClocks0), + lists:foldl(fun merge/2, VClocks1, RestVClocks1). + +merge(V1, V2) -> + merge(V1, V2, []). + +merge([], [], Acc) -> + lists:reverse(Acc); +merge([DotA = {Actor, CounterA}|RestVClockA], [_DotB = {Actor, CounterB}|RestVClockB], Acc) when CounterA >= CounterB -> + merge(RestVClockA, RestVClockB, [DotA|Acc]); +merge([_DotA = {Actor, CounterA}|RestVClockA], [DotB = {Actor, CounterB}|RestVClockB], Acc) when CounterA < CounterB -> + merge(RestVClockA, RestVClockB, [DotB|Acc]); +merge(VClockA = [{ActorA, _}|_RestVClockA], [DotB = {ActorB, _}|RestVClockB], Acc) when ActorA > ActorB-> + merge(VClockA, RestVClockB, [DotB|Acc]); +merge([DotA = {ActorA, _}|RestVClockA], VClockB = [{ActorB, _}|_RestVClockB], Acc) when ActorA < ActorB-> + merge(RestVClockA, VClockB, [DotA|Acc]); +merge([], VClockB, Acc) -> + lists:reverse(Acc) ++ VClockB; +merge(VClockA, [], Acc) -> + lists:reverse(Acc) ++ VClockA. + + % @doc Get the counter value in VClock set from Node. @@ -161,19 +191,18 @@ get_counter(Node, VClock) -> -spec increment(Node :: vclock_node(), VClock :: vclock()) -> vclock(). increment(Node, VClock0) -> - VClock1 = lists:usort(VClock0), + VClock1 = ensure_sorted(VClock0), orddict:update(Node, fun(X) -> X + 1 end, 1, VClock1). % @doc Return the list of all nodes that have ever incremented VClock. -spec all_nodes(VClock :: vclock()) -> [vclock_node()]. all_nodes(VClock0) -> - VClock1 = lists:usort(VClock0), - lists:usort([X || {X, _} <- VClock1]). + lists:usort([X || {X, _} <- VClock0]). % @doc Compares two VClocks for equality. -spec equal(VClockA :: vclock(), VClockB :: vclock()) -> boolean(). equal(VA,VB) -> - lists:usort(VA) =:= lists:usort(VB). + ensure_sorted(VA) =:= ensure_sorted(VB). %% @doc sorts the vclock by actor -spec sort(vclock()) -> vclock(). @@ -195,8 +224,8 @@ from_binary(Bin) -> %% events both have seen. -spec glb(vclock(), vclock()) -> sorted_vclock(). glb(ClockA0, ClockB0) -> - ClockA1 = lists:usort(ClockA0), - ClockB1 = lists:usort(ClockB0), + ClockA1 = ensure_sorted(ClockA0), + ClockB1 = ensure_sorted(ClockB0), glb(ClockA1, ClockB1, []). glb([], [], Acc) -> @@ -253,26 +282,26 @@ merge_test() -> {<<"4">>, 4}], VC2 = [{<<"3">>, 3}, {<<"4">>, 3}], - ?assertEqual([], merge(riak_dt_vclock:fresh())), - ?assertEqual([{<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}], + ?assertEqual([?SPECIAL_DOT], merge([riak_dt_vclock:fresh()])), + ?assertEqual([?SPECIAL_DOT, {<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}], merge([VC1, VC2])). merge_less_left_test() -> VC1 = [{<<"5">>, 5}], VC2 = [{<<"6">>, 6}, {<<"7">>, 7}], - ?assertEqual([{<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], + ?assertEqual([?SPECIAL_DOT, {<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], riak_dt_vclock:merge([VC1, VC2])). merge_less_right_test() -> VC1 = [{<<"6">>, 6}, {<<"7">>, 7}], VC2 = [{<<"5">>, 5}], - ?assertEqual([{<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], + ?assertEqual([?SPECIAL_DOT, {<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], riak_dt_vclock:merge([VC1, VC2])). merge_same_id_test() -> VC1 = [{<<"1">>, 1},{<<"2">>,1}], VC2 = [{<<"1">>, 1},{<<"3">>,1}], - ?assertEqual([{<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], + ?assertEqual([?SPECIAL_DOT, {<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], riak_dt_vclock:merge([VC1, VC2])). random_clock1(N1, N2) -> @@ -298,13 +327,15 @@ bench_test_() -> {timeout, 300, [fun() -> bench() end]}. bench() -> A = random_clock1(1, 1000), - A1 = lists:usort(A), + A1 = ensure_sorted(A), B = random_clock1(1, 1000), - B1 = lists:usort(B), + B1 = ensure_sorted(B), C = random_clock1(500, 1500), - C1 = lists:usort(C), + C1 = ensure_sorted(C), D = random_clock1(1, 1500), - D1 = lists:usort(D), + D1 = ensure_sorted(D), + E = random_clock1(1, 5), + E1 = ensure_sorted(E), ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), ?debugFmt("Increment Time (2): ~b~n", [get_time(fun increment/2, ['actor-500', A1])]), @@ -314,6 +345,9 @@ bench() -> ?debugFmt("Merge Time (4): ~b~n", [get_time(fun merge/1, [[A1, C1]])]), ?debugFmt("Merge Time (5): ~b~n", [get_time(fun merge/1, [[A, A]])]), ?debugFmt("Merge Time (6): ~b~n", [get_time(fun merge/1, [[A1, A1]])]), + ?debugFmt("Merge Time (7): ~b~n", [get_time(fun merge/1, [[E, E]])]), + ?debugFmt("Merge Time (8): ~b~n", [get_time(fun merge/1, [[E1, E1]])]), + ?debugFmt("Descends Time (1): ~b~n", [get_time(fun descends/2, [A, C])]), ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A1, C1])]), ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A, D])]), From df69cd4a4cb11d7bb97c2ea739c31e614d886bcf Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 30 Jun 2016 16:57:21 -0700 Subject: [PATCH 04/15] Fix vclock impl --- rebar.config | 11 ++++++ src/riak_dt_vclock.erl | 80 ++++++++++++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/rebar.config b/rebar.config index 2ec3da2..a99ce2c 100644 --- a/rebar.config +++ b/rebar.config @@ -4,3 +4,14 @@ {eunit_opts, [verbose]}. {xref_checks, [undefined_function_calls]}. + +{profiles, [ + {bench, [ + {erl_opts, [ + debug_info, + warnings_as_errors, + {platform_define, "^[0-9]+", namespaced_types}, + {d, 'BENCH', true} + ]} + ]} +]}. \ No newline at end of file diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index 72d0fe3..d15b2c2 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -61,6 +61,10 @@ fresh() -> []. -spec(ensure_sorted(vclock()) -> sorted_vclock()). +-ifdef(TEST). +ensure_sorted(VClock0) -> + VClock0. +-else. ensure_sorted(VClock0) -> case lists:member(?SPECIAL_DOT, VClock0) of true -> @@ -68,6 +72,7 @@ ensure_sorted(VClock0) -> false -> lists:usort([?SPECIAL_DOT|VClock0]) end. +-endif. %% {SpecialActor, _} = ?SPECIAL_DOT, %% case lists:keyfind(SpecialActor, 1, VClock0) of %% {_, SpecialVersionCounter} when SpecialVersionCounter band 1 > 0 -> @@ -107,18 +112,24 @@ dominates(Va0, Vb0) -> dominates1(Va1, Vb1, false). -spec dominates1(sorted_vclock(), sorted_vclock(), boolean()) -> boolean(). -dominates1(_, [], true) -> true; +%% We've exhausted Vb, then Va dominates dominates1([], [], HasDominated) -> HasDominated; +dominates1(_, [], _) -> true; +%% We've exhausted Va, but Vb still has actors +dominates1([], _, _) -> false; +%% Simple domination dominates1([{Actor, CounterA}|RestA], [{Actor, CounterB}|RestB], _HasDominated) when CounterA > CounterB -> dominates1(RestA, RestB, true); +%% Equal actors, and counters dominates1([{Actor, Counter}|RestA], [{Actor, Counter}|RestB], HasDominated) -> dominates1(RestA, RestB, HasDominated); +%% The counter in CounterB is bigger than CounterA dominates1([{Actor, CounterA}|_RestA], [{Actor, CounterB}|_RestB], _HasDominated) when CounterB > CounterA -> false; -dominates1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB], HasDominated) when ActorB >= ActorA -> +dominates1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB], HasDominated) when ActorB > ActorA -> dominates1(RestA, Vb, HasDominated); -dominates1(Va = [{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|RestB], HasDominated) when ActorA >= ActorB -> - dominates1(Va, RestB, HasDominated). +dominates1([{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|_RestB], _HasDominated) when ActorA > ActorB -> + false. %% @doc subtract the VClock from the DotList. %% what this means is that any {actor(), count()} pair in @@ -282,47 +293,42 @@ merge_test() -> {<<"4">>, 4}], VC2 = [{<<"3">>, 3}, {<<"4">>, 3}], - ?assertEqual([?SPECIAL_DOT], merge([riak_dt_vclock:fresh()])), - ?assertEqual([?SPECIAL_DOT, {<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}], + ?assertEqual([], merge([riak_dt_vclock:fresh()])), + ?assertEqual([{<<"1">>,1},{<<"2">>,2},{<<"3">>,3},{<<"4">>,4}], merge([VC1, VC2])). merge_less_left_test() -> VC1 = [{<<"5">>, 5}], VC2 = [{<<"6">>, 6}, {<<"7">>, 7}], - ?assertEqual([?SPECIAL_DOT, {<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], + ?assertEqual([{<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], riak_dt_vclock:merge([VC1, VC2])). merge_less_right_test() -> VC1 = [{<<"6">>, 6}, {<<"7">>, 7}], VC2 = [{<<"5">>, 5}], - ?assertEqual([?SPECIAL_DOT, {<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], + ?assertEqual([{<<"5">>, 5},{<<"6">>, 6}, {<<"7">>, 7}], riak_dt_vclock:merge([VC1, VC2])). merge_same_id_test() -> VC1 = [{<<"1">>, 1},{<<"2">>,1}], VC2 = [{<<"1">>, 1},{<<"3">>,1}], - ?assertEqual([?SPECIAL_DOT, {<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], + ?assertEqual([{<<"1">>, 1},{<<"2">>,1},{<<"3">>,1}], riak_dt_vclock:merge([VC1, VC2])). -random_clock1(N1, N2) -> - Seq0 = lists:seq(N1, N2), - Seq1 = [{rand:uniform(1000000), X} || X <- Seq0], - Seq2 = lists:sort(Seq1), - Seq3 = [X || {_, X} <- Seq2], - lists:map( - fun(I) -> - Actor = list_to_atom(lists:flatten(io_lib:format("actor-~b", [I]))), - {Actor, random:uniform(100000)} - end, - Seq3 - ). -get_time(Fun, Args) -> - T1 = os:timestamp(), - lists:foreach(fun(_) -> apply(Fun, Args) end, lists:seq(1, 10000)), - T2 = os:timestamp(), - DiffMicros = timer:now_diff(T2, T1), - round(DiffMicros / 10000.0). +% if Va strictly dominates Vb, else false! +dominates_test() -> + ?assertNot(dominates([], [])), + ?assert(dominates([{'minuteman@10.0.3.237',1}], [])), + ?assertNot(dominates([], [{a, 1}])), + ?assertNot(dominates([{a, 1}], [{b,1}])), + ?assertNot(dominates([{b, 1}], [{a,1}])), + ?assert(dominates([{a, 1}, {b,1}, {c, 1}, {d,1}], [{c, 1}])), + ?assertNot(dominates([{c, 1}], [{a, 1}, {b,1}, {c, 1}, {d,1}])), + ?assertNot(dominates([{a, 1}, {c, 1}], [{b, 1}])), + ?assertNot(dominates([{b, 1}], [{a, 1}, {c, 1}])). + +-ifdef(BENCH). bench_test_() -> {timeout, 300, [fun() -> bench() end]}. bench() -> @@ -357,4 +363,24 @@ bench() -> ?debugFmt("Descends Time (6): ~b~n", [get_time(fun descends/2, [D, D])]), ?debugFmt("Descends Time (7): ~b~n", [get_time(fun descends/2, [D1, D1])]). +random_clock1(N1, N2) -> + Seq0 = lists:seq(N1, N2), + Seq1 = [{rand:uniform(1000000), X} || X <- Seq0], + Seq2 = lists:sort(Seq1), + Seq3 = [X || {_, X} <- Seq2], + lists:map( + fun(I) -> + Actor = list_to_atom(lists:flatten(io_lib:format("actor-~b", [I]))), + {Actor, random:uniform(100000)} + end, + Seq3 + ). +get_time(Fun, Args) -> + T1 = os:timestamp(), + lists:foreach(fun(_) -> apply(Fun, Args) end, lists:seq(1, 10000)), + T2 = os:timestamp(), + DiffMicros = timer:now_diff(T2, T1), + round(DiffMicros / 10000.0). + +-endif. -endif. From 30c63f005502784d71be75ae34d36fe36710e51c Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 9 Nov 2016 19:02:42 -0800 Subject: [PATCH 05/15] Fix riak_dt_vclock test --- src/riak_dt_vclock.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index d15b2c2..8e7c428 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -147,7 +147,7 @@ subtract_dots(DotList0, VClock0) -> drop_dots([], _Clock, NewDots) -> lists:reverse(NewDots); drop_dots(A, [], NewDots) -> - lists:reverse(A) ++ lists:reverse(NewDots); + A ++ lists:reverse(NewDots); drop_dots([Dot = {Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA > CountB -> drop_dots(RestA, RestB, [Dot|Acc]); drop_dots([{Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA =< CountB -> @@ -328,6 +328,9 @@ dominates_test() -> ?assertNot(dominates([{a, 1}, {c, 1}], [{b, 1}])), ?assertNot(dominates([{b, 1}], [{a, 1}, {c, 1}])). +subtract_dots_test() -> + ?assertEqual([{a, 1}, {b, 2}], subtract_dots([{a, 1}, {b, 2}], [])). + -ifdef(BENCH). bench_test_() -> {timeout, 300, [fun() -> bench() end]}. From 6e803986f47f779df7bbf29bcf2b5192d420d25e Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 9 Nov 2016 23:14:49 -0800 Subject: [PATCH 06/15] Implement is_sorted, and merge functions in a NIF --- c_src/Makefile | 74 +++++++++++++++++++++++++++ c_src/riak_dt_vclock.c | 110 +++++++++++++++++++++++++++++++++++++++++ rebar.config | 9 +++- src/riak_dt_vclock.erl | 98 ++++++++++++++++++------------------ 4 files changed, 241 insertions(+), 50 deletions(-) create mode 100644 c_src/Makefile create mode 100644 c_src/riak_dt_vclock.c diff --git a/c_src/Makefile b/c_src/Makefile new file mode 100644 index 0000000..028c228 --- /dev/null +++ b/c_src/Makefile @@ -0,0 +1,74 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)/..) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") + +C_SRC_DIR = $(CURDIR) + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +RIAK_DT_VCLOCK_SO = $(CURDIR)/../priv/riak_dt_vclock.so +$(RIAK_DT_VCLOCK_SO): riak_dt_vclock.o + @mkdir -p $(BASEDIR)/priv/ + $(link_verbose) $(CC) riak_dt_vclock.o $(LDFLAGS) $(LDLIBS) -o $(RIAK_DT_VCLOCK_SO) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(OBJECTS) $(CURDIR)/../priv/riak_dt_vclock.so \ No newline at end of file diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c new file mode 100644 index 0000000..4085a2c --- /dev/null +++ b/c_src/riak_dt_vclock.c @@ -0,0 +1,110 @@ +#include "erl_nif.h" + +#include + + +static ERL_NIF_TERM atom_true; +static ERL_NIF_TERM atom_false; +static ERL_NIF_TERM badarg; + + + +static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + atom_true = enif_make_atom(env, "true"); + atom_false = enif_make_atom(env, "false"); + badarg = enif_make_badarg(env); + return 0; +} + +ERL_NIF_TERM is_sorted_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM head1, head2, tail, rest, list = argv[0]; + + if (!enif_is_list(env, list)) + return badarg; + + while(enif_get_list_cell(env, list, &head1, &tail) && enif_get_list_cell(env, tail, &head2, &rest)) { + if (enif_compare(head1, head2) >= 0) + return atom_false; + + list = tail; + } + + return atom_true; +} + +static inline ErlNifSInt64 max(ErlNifSInt64 i1, ErlNifSInt64 i2) { + if (i1 > i2) + return i1; + else + return i2; +} + +ERL_NIF_TERM merge2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + ERL_NIF_TERM rretlist, retlist = enif_make_list(env, 0); + ERL_NIF_TERM newtuple; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + /* We will never receive an empty list either on the left or right side */ + + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + + newtuple = enif_make_tuple2(env, tuplelhs[0], enif_make_int64(env, max(vallhs, valrhs))); + retlist = enif_make_list_cell(env, newtuple, retlist); + /* Pop both lists */ + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { /* lhs < rhs */ + /* Pop something off LHS to have it try to catch up with RHS */ + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } else if (cmp > 0) { /* lhs > rhs */ + /* Pop something off RHS to have it try to catch up with LHS */ + retlist = enif_make_list_cell(env, headrhs, retlist); + listrhs = tailrhs; + } + } + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs)) { + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } + + while (enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + retlist = enif_make_list_cell(env, headrhs, retlist); + listrhs = tailrhs; + } + + enif_make_reverse_list(env, retlist, &rretlist); + return rretlist; +} + +static ErlNifFunc nif_funcs[] = { + {"is_sorted", 1, is_sorted_nif}, + {"merge2", 2, merge2_nif}, + // {"increment2", 2, increment2_nif}, +}; + +ERL_NIF_INIT(riak_dt_vclock, nif_funcs, load, NULL, NULL, NULL) diff --git a/rebar.config b/rebar.config index a99ce2c..15ccd35 100644 --- a/rebar.config +++ b/rebar.config @@ -14,4 +14,11 @@ {d, 'BENCH', true} ]} ]} -]}. \ No newline at end of file +]}. + +{pre_hooks, + [{"(linux|darwin|solaris)", compile, "make -C c_src"}, + {"(freebsd)", compile, "gmake -C c_src"}]}. +{post_hooks, + [{"(linux|darwin|solaris)", clean, "make -C c_src clean"}, + {"(freebsd)", clean, "gmake -C c_src clean"}]}. \ No newline at end of file diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index 8e7c428..ad9a1b8 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -32,6 +32,9 @@ -module(riak_dt_vclock). -compile(inline_list_funcs). +-on_load(init/0). + + -export([fresh/0,descends/2,merge/1,get_counter/2, subtract_dots/2, increment/2,all_nodes/1, equal/2, to_binary/1, from_binary/1, dominates/2, glb/2]). @@ -40,9 +43,11 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +-define(APPNAME, riak_dt). +-define(LIBNAME, riak_dt_vclock). + %% The presence of the special actor indicates the list is sorted %% This is a very "low" actor # that was chosen randomly, and smaller than -2**31 --define(SPECIAL_DOT, {-34359738368, 1}). -export_type([vclock/0, vclock_node/0, binary_vclock/0]). -type vclock() :: [vc_entry()]. @@ -55,36 +60,35 @@ -type vclock_node() :: term(). -type counter() :: non_neg_integer(). +init() -> + SoName = case code:priv_dir(?APPNAME) of + {error, bad_name} -> + case filelib:is_dir(filename:join(["..", priv])) of + true -> + filename:join(["..", priv, ?LIBNAME]); + _ -> + filename:join([priv, ?LIBNAME]) + end; + Dir -> + filename:join(Dir, ?LIBNAME) + end, + erlang:load_nif(SoName, 0). + + % @doc Create a brand new vclock. -spec fresh() -> sorted_vclock(). fresh() -> []. +is_sorted(_List) -> + erlang:nif_error({error, not_loaded}). + -spec(ensure_sorted(vclock()) -> sorted_vclock()). --ifdef(TEST). ensure_sorted(VClock0) -> - VClock0. --else. -ensure_sorted(VClock0) -> - case lists:member(?SPECIAL_DOT, VClock0) of - true -> - VClock0; - false -> - lists:usort([?SPECIAL_DOT|VClock0]) + case is_sorted(VClock0) of + true -> VClock0; + false -> lists:usort(VClock0) end. --endif. -%% {SpecialActor, _} = ?SPECIAL_DOT, -%% case lists:keyfind(SpecialActor, 1, VClock0) of -%% {_, SpecialVersionCounter} when SpecialVersionCounter band 1 > 0 -> -%% VClock0; -%% Else -> -%% ?debugFmt("Found: ~p~n", [Else]), -%% After = lists:usort([?SPECIAL_DOT|VClock0]), -%% ?debugFmt("After: ~p~n", [After]), -%% After -%% -%% end. - % @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! -spec descends(Va :: vclock()|[], Vb :: vclock()|[]) -> boolean(). @@ -166,26 +170,9 @@ merge(VClocks0) -> [VClocks1|RestVClocks1] = lists:map(fun ensure_sorted/1, VClocks0), lists:foldl(fun merge/2, VClocks1, RestVClocks1). -merge(V1, V2) -> - merge(V1, V2, []). - -merge([], [], Acc) -> - lists:reverse(Acc); -merge([DotA = {Actor, CounterA}|RestVClockA], [_DotB = {Actor, CounterB}|RestVClockB], Acc) when CounterA >= CounterB -> - merge(RestVClockA, RestVClockB, [DotA|Acc]); -merge([_DotA = {Actor, CounterA}|RestVClockA], [DotB = {Actor, CounterB}|RestVClockB], Acc) when CounterA < CounterB -> - merge(RestVClockA, RestVClockB, [DotB|Acc]); -merge(VClockA = [{ActorA, _}|_RestVClockA], [DotB = {ActorB, _}|RestVClockB], Acc) when ActorA > ActorB-> - merge(VClockA, RestVClockB, [DotB|Acc]); -merge([DotA = {ActorA, _}|RestVClockA], VClockB = [{ActorB, _}|_RestVClockB], Acc) when ActorA < ActorB-> - merge(RestVClockA, VClockB, [DotA|Acc]); -merge([], VClockB, Acc) -> - lists:reverse(Acc) ++ VClockB; -merge(VClockA, [], Acc) -> - lists:reverse(Acc) ++ VClockA. - - +merge(V1, V2) -> merge2(V1, V2). +merge2(_V1, _V2) -> erlang:nif_error({error, not_loaded}). % @doc Get the counter value in VClock set from Node. -spec get_counter(Node :: vclock_node(), VClock :: vclock()) -> counter(). @@ -203,7 +190,9 @@ get_counter(Node, VClock) -> VClock :: vclock()) -> vclock(). increment(Node, VClock0) -> VClock1 = ensure_sorted(VClock0), - orddict:update(Node, fun(X) -> X + 1 end, 1, VClock1). + orddict:update_counter(Node, 1, VClock1). + +%%increment2(_Node, _VClock1) -> erlang:nif_error({error, not_loaded}). % @doc Return the list of all nodes that have ever incremented VClock. -spec all_nodes(VClock :: vclock()) -> [vclock_node()]. @@ -331,19 +320,29 @@ dominates_test() -> subtract_dots_test() -> ?assertEqual([{a, 1}, {b, 2}], subtract_dots([{a, 1}, {b, 2}], [])). +is_sorted_test() -> + ?assert(is_sorted([1,2,3])), + ?assertNot(is_sorted([1,1])), + ?assertNot(is_sorted([1,2,1])), + ?assertNot(is_sorted([1,1])), + ?assertNot(is_sorted([2,2])), + ?assert(is_sorted([1,2])), + ?assert(is_sorted([1])), + ?assert(is_sorted([])). + -ifdef(BENCH). bench_test_() -> {timeout, 300, [fun() -> bench() end]}. bench() -> - A = random_clock1(1, 1000), + A = random_clock1(1, 2000), A1 = ensure_sorted(A), - B = random_clock1(1, 1000), + B = random_clock1(1, 2000), B1 = ensure_sorted(B), - C = random_clock1(500, 1500), + C = random_clock1(500, 3000), C1 = ensure_sorted(C), - D = random_clock1(1, 1500), + D = random_clock1(1, 3000), D1 = ensure_sorted(D), - E = random_clock1(1, 5), + E = random_clock1(1, 10), E1 = ensure_sorted(E), ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), @@ -374,7 +373,7 @@ random_clock1(N1, N2) -> lists:map( fun(I) -> Actor = list_to_atom(lists:flatten(io_lib:format("actor-~b", [I]))), - {Actor, random:uniform(100000)} + {Actor, rand:uniform(100000)} end, Seq3 ). @@ -385,5 +384,6 @@ get_time(Fun, Args) -> DiffMicros = timer:now_diff(T2, T1), round(DiffMicros / 10000.0). + -endif. -endif. From 0138fe08bd8950a938c51b98b223566a697b4c5c Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 9 Nov 2016 23:16:08 -0800 Subject: [PATCH 07/15] Add *.o (object files) to gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index e83ee2d..334d26c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ rel/riak_dt data/* .qc/* .eqc-info -current_counterexample.eqc \ No newline at end of file +current_counterexample.eqc +*.o From 16ab1ad200ac530ed84e48a3c61dcea4cdef1a26 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 9 Nov 2016 23:16:25 -0800 Subject: [PATCH 08/15] Add small comment around TODO --- c_src/riak_dt_vclock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c index 4085a2c..0b84d78 100644 --- a/c_src/riak_dt_vclock.c +++ b/c_src/riak_dt_vclock.c @@ -97,6 +97,7 @@ ERL_NIF_TERM merge2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { listrhs = tailrhs; } + /* TODO: Maybe have some heuristic here to determine whether to do a reverse here, or in Erlang? */ enif_make_reverse_list(env, retlist, &rretlist); return rretlist; } From 690248c0d477c5ccf2405081342b20079f90be50 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Nov 2016 00:28:07 -0800 Subject: [PATCH 09/15] Further c-ification --- c_src/riak_dt_vclock.c | 118 +++++++++++++++++++++++++++++++++++++++-- src/riak_dt_vclock.erl | 95 +++++++++++++++++---------------- 2 files changed, 164 insertions(+), 49 deletions(-) diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c index 0b84d78..e8fd9ad 100644 --- a/c_src/riak_dt_vclock.c +++ b/c_src/riak_dt_vclock.c @@ -18,12 +18,21 @@ static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { ERL_NIF_TERM is_sorted_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { ERL_NIF_TERM head1, head2, tail, rest, list = argv[0]; + const ERL_NIF_TERM *tuple1, *tuple2; + int arity1, arity2; if (!enif_is_list(env, list)) return badarg; + while(enif_get_list_cell(env, list, &head1, &tail) && enif_get_list_cell(env, tail, &head2, &rest)) { - if (enif_compare(head1, head2) >= 0) + if(!enif_get_tuple(env, head1, &arity1, &tuple1) || !enif_get_tuple(env, head2, &arity2, &tuple2)) + return badarg; + + if (arity1 == 0 || arity2 == 0) + return badarg; + + if (enif_compare(tuple1[0], tuple2[0]) >= 0) return atom_false; list = tail; @@ -51,7 +60,6 @@ ERL_NIF_TERM merge2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { /* We will never receive an empty list either on the left or right side */ - while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { @@ -102,10 +110,114 @@ ERL_NIF_TERM merge2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { return rretlist; } +/* Return true if Lhs is a direct descendant of Rhs, else false -- remember, a vclock is its own descendant! */ + +ERL_NIF_TERM descends2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + if (vallhs < valrhs) + return atom_false; + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { + /* We have an extra actor on the lhs, drop it */ + listlhs = taillhs; + } else + return atom_false; + } + /* Make sure that RHS is empty */ + if(enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) + return atom_false; + + return atom_true; +} + +ERL_NIF_TERM drop_dots_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + ERL_NIF_TERM rretlist, retlist = enif_make_list(env, 0); + ERL_NIF_TERM newtuple; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + /* We will never receive an empty list either on the left or right side */ + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (cmp == 0) { + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + if (vallhs > valrhs) { + newtuple = enif_make_tuple2(env, tuplelhs[0], enif_make_int64(env, vallhs)); + retlist = enif_make_list_cell(env, newtuple, retlist); + } + /* Pop both lists */ + listlhs = taillhs; + listrhs = tailrhs; + } else if (cmp < 0) { /* lhs < rhs */ + /* Pop something off LHS to have it try to catch up with RHS */ + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } else if (cmp > 0) { /* lhs > rhs */ + /* Pop something off RHS to have it try to catch up with LHS */ + listrhs = tailrhs; + } + } + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs)) { + retlist = enif_make_list_cell(env, headlhs, retlist); + listlhs = taillhs; + } + + /* TODO: Maybe have some heuristic here to determine whether to do a reverse here, or in Erlang? */ + enif_make_reverse_list(env, retlist, &rretlist); + return rretlist; +} + + static ErlNifFunc nif_funcs[] = { + {"descends2", 2, descends2_nif}, {"is_sorted", 1, is_sorted_nif}, {"merge2", 2, merge2_nif}, - // {"increment2", 2, increment2_nif}, + {"drop_dots", 2, drop_dots_nif}, }; ERL_NIF_INIT(riak_dt_vclock, nif_funcs, load, NULL, NULL, NULL) diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index ad9a1b8..20b4e5e 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -84,10 +84,16 @@ is_sorted(_List) -> erlang:nif_error({error, not_loaded}). -spec(ensure_sorted(vclock()) -> sorted_vclock()). +%ensure_sorted([]) -> +% case is_sorted([]) of +% _ -> [] +% end; +%ensure_sorted(X) -> X. ensure_sorted(VClock0) -> case is_sorted(VClock0) of true -> VClock0; - false -> lists:usort(VClock0) + false -> + lists:usort(VClock0) end. % @doc Return true if Va is a direct descendant of Vb, else false -- remember, a vclock is its own descendant! @@ -95,18 +101,12 @@ ensure_sorted(VClock0) -> descends(Va0, Vb0) -> Va1 = ensure_sorted(Va0), Vb1 = ensure_sorted(Vb0), - descends1(Va1, Vb1). + descends2(Va1, Vb1). --spec descends1(Va :: sorted_vclock()|[], Vb :: sorted_vclock()|[]) -> boolean(). -descends1(_, []) -> - true; -descends1([{Actor, CounterA}|RestA], [{Actor, CounterB}|RestB]) when CounterA >= CounterB -> - descends1(RestA, RestB); -descends1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB]) when ActorB >= ActorA -> - descends1(RestA, Vb); -descends1(_, _) -> - false. +-spec descends2(Va :: sorted_vclock()|[], Vb :: sorted_vclock()|[]) -> boolean(). +descends2(_Va, _Vb) -> + erlang:nif_error({error, not_loaded}). % @doc Return true if Va strictly dominates Vb, else false! -spec dominates(vclock(), vclock()) -> boolean(). @@ -145,21 +145,23 @@ dominates1([{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|_RestB], _HasDomin subtract_dots(DotList0, VClock0) -> DotList1 = ensure_sorted(DotList0), VClock1 = ensure_sorted(VClock0), - drop_dots(DotList1, VClock1, []). + drop_dots(DotList1, VClock1). +drop_dots(_DotList, _VClock) -> + erlang:nif_error({error, not_loaded}). %% A - B -drop_dots([], _Clock, NewDots) -> - lists:reverse(NewDots); -drop_dots(A, [], NewDots) -> - A ++ lists:reverse(NewDots); -drop_dots([Dot = {Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA > CountB -> - drop_dots(RestA, RestB, [Dot|Acc]); -drop_dots([{Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA =< CountB -> - drop_dots(RestA, RestB, Acc); -drop_dots([Dot = {ActorA, _CountA} | RestA], B = [{ActorB, _CountB} | _RestB], Acc) when ActorB > ActorA -> - drop_dots(RestA, B, [Dot|Acc]); -drop_dots(A = [{ActorA, _CountA} | _RestA], [{ActorB, _CountB} | RestB], Acc) when ActorA > ActorB -> - drop_dots(A, RestB, Acc). +%%drop_dots([], _Clock, NewDots) -> +%% lists:reverse(NewDots); +%%drop_dots(A, [], NewDots) -> +%% A ++ lists:reverse(NewDots); +%%drop_dots([Dot = {Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA > CountB -> +%% drop_dots(RestA, RestB, [Dot|Acc]); +%%drop_dots([{Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA =< CountB -> +%% drop_dots(RestA, RestB, Acc); +%%drop_dots([Dot = {ActorA, _CountA} | RestA], B = [{ActorB, _CountB} | _RestB], Acc) when ActorB > ActorA -> +%% drop_dots(RestA, B, [Dot|Acc]); +%%drop_dots(A = [{ActorA, _CountA} | _RestA], [{ActorB, _CountB} | RestB], Acc) when ActorA > ActorB -> +%% drop_dots(A, RestB, Acc). @@ -207,7 +209,7 @@ equal(VA,VB) -> %% @doc sorts the vclock by actor -spec sort(vclock()) -> vclock(). sort(Clock) -> - lists:usort(Clock). + ensure_sorted(Clock). %% @doc an effecient format for disk / wire. %5 @see `from_binary/1` @@ -321,13 +323,14 @@ subtract_dots_test() -> ?assertEqual([{a, 1}, {b, 2}], subtract_dots([{a, 1}, {b, 2}], [])). is_sorted_test() -> - ?assert(is_sorted([1,2,3])), - ?assertNot(is_sorted([1,1])), - ?assertNot(is_sorted([1,2,1])), - ?assertNot(is_sorted([1,1])), - ?assertNot(is_sorted([2,2])), - ?assert(is_sorted([1,2])), - ?assert(is_sorted([1])), + ?assertNot(is_sorted([{1},{2},{3},{2}])), + ?assert(is_sorted([{1},{2},{3}])), + ?assertNot(is_sorted([{1},{1}])), + ?assertNot(is_sorted([{1},{2},{1}])), + ?assertNot(is_sorted([{1},{1}])), + ?assertNot(is_sorted([{2},{2}])), + ?assert(is_sorted([{1},{2}])), + ?assert(is_sorted([{1}])), ?assert(is_sorted([])). -ifdef(BENCH). @@ -335,34 +338,34 @@ bench_test_() -> {timeout, 300, [fun() -> bench() end]}. bench() -> A = random_clock1(1, 2000), - A1 = ensure_sorted(A), + A1 = lists:usort(A), B = random_clock1(1, 2000), - B1 = ensure_sorted(B), + B1 = lists:usort(B), C = random_clock1(500, 3000), - C1 = ensure_sorted(C), + C1 = lists:usort(C), D = random_clock1(1, 3000), - D1 = ensure_sorted(D), + D1 = lists:usort(D), E = random_clock1(1, 10), - E1 = ensure_sorted(E), + E1 = lists:usort(E), - ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), +% ?debugFmt("Increment Time (1): ~b~n", [get_time(fun increment/2, ['actor-500', A])]), ?debugFmt("Increment Time (2): ~b~n", [get_time(fun increment/2, ['actor-500', A1])]), - ?debugFmt("Merge Time (1): ~b~n", [get_time(fun merge/1, [[B, A]])]), + %?debugFmt("Merge Time (1): ~b~n", [get_time(fun merge/1, [[B, A]])]), ?debugFmt("Merge Time (2): ~b~n", [get_time(fun merge/1, [[B1, A1]])]), - ?debugFmt("Merge Time (3): ~b~n", [get_time(fun merge/1, [[A, C]])]), + %?debugFmt("Merge Time (3): ~b~n", [get_time(fun merge/1, [[A, C]])]), ?debugFmt("Merge Time (4): ~b~n", [get_time(fun merge/1, [[A1, C1]])]), - ?debugFmt("Merge Time (5): ~b~n", [get_time(fun merge/1, [[A, A]])]), + %?debugFmt("Merge Time (5): ~b~n", [get_time(fun merge/1, [[A, A]])]), ?debugFmt("Merge Time (6): ~b~n", [get_time(fun merge/1, [[A1, A1]])]), - ?debugFmt("Merge Time (7): ~b~n", [get_time(fun merge/1, [[E, E]])]), + %?debugFmt("Merge Time (7): ~b~n", [get_time(fun merge/1, [[E, E]])]), ?debugFmt("Merge Time (8): ~b~n", [get_time(fun merge/1, [[E1, E1]])]), - ?debugFmt("Descends Time (1): ~b~n", [get_time(fun descends/2, [A, C])]), + %?debugFmt("Descends Time (1): ~b~n", [get_time(fun descends/2, [A, C])]), ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A1, C1])]), - ?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A, D])]), + %?debugFmt("Descends Time (2): ~b~n", [get_time(fun descends/2, [A, D])]), ?debugFmt("Descends Time (3): ~b~n", [get_time(fun descends/2, [A1, D1])]), - ?debugFmt("Descends Time (4): ~b~n", [get_time(fun descends/2, [D, A])]), + %?debugFmt("Descends Time (4): ~b~n", [get_time(fun descends/2, [D, A])]), ?debugFmt("Descends Time (5): ~b~n", [get_time(fun descends/2, [D1, A1])]), - ?debugFmt("Descends Time (6): ~b~n", [get_time(fun descends/2, [D, D])]), + %?debugFmt("Descends Time (6): ~b~n", [get_time(fun descends/2, [D, D])]), ?debugFmt("Descends Time (7): ~b~n", [get_time(fun descends/2, [D1, D1])]). random_clock1(N1, N2) -> From 96d99296d07273c8f50d8ee68fbce26cfee286a8 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Nov 2016 00:42:39 -0800 Subject: [PATCH 10/15] Optimize is_sorted function further --- c_src/riak_dt_vclock.c | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c index e8fd9ad..975652d 100644 --- a/c_src/riak_dt_vclock.c +++ b/c_src/riak_dt_vclock.c @@ -21,24 +21,30 @@ ERL_NIF_TERM is_sorted_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) const ERL_NIF_TERM *tuple1, *tuple2; int arity1, arity2; - if (!enif_is_list(env, list)) - return badarg; - - - while(enif_get_list_cell(env, list, &head1, &tail) && enif_get_list_cell(env, tail, &head2, &rest)) { - if(!enif_get_tuple(env, head1, &arity1, &tuple1) || !enif_get_tuple(env, head2, &arity2, &tuple2)) + if(enif_get_list_cell(env, list, &head1, &tail)) { + if(!enif_get_tuple(env, head1, &arity1, &tuple1)) return badarg; - - if (arity1 == 0 || arity2 == 0) + if(arity1 == 0) return badarg; - if (enif_compare(tuple1[0], tuple2[0]) >= 0) - return atom_false; - - list = tail; - } + while(enif_get_list_cell(env, tail, &head2, &rest)) { + if(!enif_get_tuple(env, head2, &arity2, &tuple2)) + return badarg; + if (arity2 == 0) + return badarg; + if (enif_compare(tuple1[0], tuple2[0]) >= 0) + return atom_false; - return atom_true; + tuple1 = tuple2; + arity1 = arity2; + head1 = head2; + tail = rest; + } + return atom_true; + } else if (enif_is_empty_list(env, list)) + return atom_true; + else + return badarg; } static inline ErlNifSInt64 max(ErlNifSInt64 i1, ErlNifSInt64 i2) { From 2dbc1e05b75dd6d7a0b0d44acc8c4b1de9e1f3f7 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Nov 2016 01:00:47 -0800 Subject: [PATCH 11/15] Finish up dominates implementation --- c_src/riak_dt_vclock.c | 61 ++++++++++++++++++++++++++++++++++++++++-- src/riak_dt_vclock.erl | 40 +++------------------------ 2 files changed, 63 insertions(+), 38 deletions(-) diff --git a/c_src/riak_dt_vclock.c b/c_src/riak_dt_vclock.c index 975652d..009b608 100644 --- a/c_src/riak_dt_vclock.c +++ b/c_src/riak_dt_vclock.c @@ -1,6 +1,5 @@ #include "erl_nif.h" - -#include +#include static ERL_NIF_TERM atom_true; @@ -218,12 +217,70 @@ ERL_NIF_TERM drop_dots_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) return rretlist; } +ERL_NIF_TERM dominates2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + ERL_NIF_TERM headlhs, taillhs, listlhs = argv[0]; + ERL_NIF_TERM headrhs, tailrhs, listrhs = argv[1]; + bool has_dominated = false; + + const ERL_NIF_TERM *tuplelhs, *tuplerhs; + int aritylhs, arityrhs, cmp; + ErlNifSInt64 vallhs, valrhs; + + while (enif_get_list_cell(env, listlhs, &headlhs, &taillhs) && + enif_get_list_cell(env, listrhs, &headrhs, &tailrhs)) { + + if(!enif_get_tuple(env, headlhs, &aritylhs, &tuplelhs)) + return badarg; + if (aritylhs != 2) + return badarg; + + if(!enif_get_tuple(env, headrhs, &arityrhs, &tuplerhs)) + return badarg; + if (arityrhs != 2) + return badarg; + + cmp = enif_compare(tuplelhs[0], tuplerhs[0]); + if (!enif_get_int64(env, tuplelhs[1], &vallhs)) + return badarg; + if (!enif_get_int64(env, tuplerhs[1], &valrhs)) + return badarg; + + /* Extra actor on left hand side, ignore it */ + if (cmp < 0) { + listlhs = taillhs; + } else if (cmp > 0) { + return atom_false; + } else { + if (vallhs < valrhs) + return atom_false; + else if(vallhs > valrhs) + has_dominated = true; + listlhs = taillhs; + listrhs = tailrhs; + } + } + + /* Both sides are empty */ + if(enif_is_empty_list(env, listlhs) && enif_is_empty_list(env, listrhs)) { + if(has_dominated) + return atom_true; + else + return atom_false; + /* LHS is not empty, but RHS is empty */ + } else if(!enif_is_empty_list(env, listlhs) && enif_is_empty_list(env, listrhs)) { + return atom_true; + /* LHS is empty, but RHS is not empty */ + } else { + return atom_false; + } +} static ErlNifFunc nif_funcs[] = { {"descends2", 2, descends2_nif}, {"is_sorted", 1, is_sorted_nif}, {"merge2", 2, merge2_nif}, {"drop_dots", 2, drop_dots_nif}, + {"dominates2", 2, dominates2_nif} }; ERL_NIF_INIT(riak_dt_vclock, nif_funcs, load, NULL, NULL, NULL) diff --git a/src/riak_dt_vclock.erl b/src/riak_dt_vclock.erl index 20b4e5e..120a388 100644 --- a/src/riak_dt_vclock.erl +++ b/src/riak_dt_vclock.erl @@ -113,27 +113,10 @@ descends2(_Va, _Vb) -> dominates(Va0, Vb0) -> Va1 = ensure_sorted(Va0), Vb1 = ensure_sorted(Vb0), - dominates1(Va1, Vb1, false). - --spec dominates1(sorted_vclock(), sorted_vclock(), boolean()) -> boolean(). -%% We've exhausted Vb, then Va dominates -dominates1([], [], HasDominated) -> HasDominated; -dominates1(_, [], _) -> true; -%% We've exhausted Va, but Vb still has actors -dominates1([], _, _) -> false; -%% Simple domination -dominates1([{Actor, CounterA}|RestA], [{Actor, CounterB}|RestB], _HasDominated) when CounterA > CounterB -> - dominates1(RestA, RestB, true); -%% Equal actors, and counters -dominates1([{Actor, Counter}|RestA], [{Actor, Counter}|RestB], HasDominated) -> - dominates1(RestA, RestB, HasDominated); -%% The counter in CounterB is bigger than CounterA -dominates1([{Actor, CounterA}|_RestA], [{Actor, CounterB}|_RestB], _HasDominated) when CounterB > CounterA -> - false; -dominates1([{ActorA, _CounterA}|RestA], Vb = [{ActorB, _CounterB}|_RestB], HasDominated) when ActorB > ActorA -> - dominates1(RestA, Vb, HasDominated); -dominates1([{ActorA, _CounterA}|_RestA], [{ActorB, _CounterB}|_RestB], _HasDominated) when ActorA > ActorB -> - false. + dominates2(Va1, Vb1). + +dominates2(_Va, _Vb) -> + erlang:nif_error({error, not_loaded}). %% @doc subtract the VClock from the DotList. %% what this means is that any {actor(), count()} pair in @@ -149,21 +132,6 @@ subtract_dots(DotList0, VClock0) -> drop_dots(_DotList, _VClock) -> erlang:nif_error({error, not_loaded}). -%% A - B -%%drop_dots([], _Clock, NewDots) -> -%% lists:reverse(NewDots); -%%drop_dots(A, [], NewDots) -> -%% A ++ lists:reverse(NewDots); -%%drop_dots([Dot = {Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA > CountB -> -%% drop_dots(RestA, RestB, [Dot|Acc]); -%%drop_dots([{Actor, CountA} | RestA], [{Actor, CountB} | RestB], Acc) when CountA =< CountB -> -%% drop_dots(RestA, RestB, Acc); -%%drop_dots([Dot = {ActorA, _CountA} | RestA], B = [{ActorB, _CountB} | _RestB], Acc) when ActorB > ActorA -> -%% drop_dots(RestA, B, [Dot|Acc]); -%%drop_dots(A = [{ActorA, _CountA} | _RestA], [{ActorB, _CountB} | RestB], Acc) when ActorA > ActorB -> -%% drop_dots(A, RestB, Acc). - - % @doc Combine all VClocks in the input list into their least possible % common descendant. From 26247f2115f7e23311686753e601af4b1021fa36 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Nov 2016 02:18:11 -0800 Subject: [PATCH 12/15] Use dict for intermediate fold operation --- src/riak_dt_orswot.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 6cc83a7..46e1b14 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -279,8 +279,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferre Entries = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries0), Deffered = merge_deferred(LHSDeferred, RHSDeferred), - - apply_deferred(Clock, Entries, Deffered). + apply_deferred(Clock, dict:to_list(Entries), Deffered). %% @private merge the deffered operations for both sets. -spec merge_deferred(deferred(), deferred()) -> deferred(). @@ -307,16 +306,17 @@ apply_deferred(Clock, Entries, Deferred) -> %% @doc check if each element in `Entries' should be in the merged %% set. -spec merge_disjoint_keys(set(), orddict:orddict(), - riak_dt_vclock:vclock(), orddict:orddict()) -> orddict:orddict(). -merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> + riak_dt_vclock:vclock(), dict:dict()) -> dict:dict(). +merge_disjoint_keys(Keys, Entries0, SetClock, Accumulator) -> + Entries1 = dict:from_list(Entries0), sets:fold(fun(Key, Acc) -> - Dots = orddict:fetch(Key, Entries), + Dots = dict:fetch(Key, Entries1), case riak_dt_vclock:descends(SetClock, Dots) of false -> %% Optimise the set of stored dots to %% include only those unseen NewDots = riak_dt_vclock:subtract_dots(Dots, SetClock), - orddict:store(Key, NewDots, Acc); + dict:store(Key, NewDots, Acc); true -> Acc end @@ -327,7 +327,7 @@ merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> %% @doc merges the minimal clocks for the common entries in both sets. -spec merge_common_keys(set(), {riak_dt_vclock:vclock(), entries(), deferred()}, {riak_dt_vclock:vclock(), entries(), deferred()}) -> - orddict:orddict(). + dict:dict(). merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> %% If both sides have the same values, some dots may still need to @@ -350,12 +350,12 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, %% Perfectly possible that an item in both sets should be dropped case V of [] -> - orddict:erase(Key, Acc); + dict:erase(Key, Acc); _ -> - orddict:store(Key, V, Acc) + dict:store(Key, V, Acc) end end, - orddict:new(), + dict:new(), CommonKeys). -spec equal(orswot(), orswot()) -> boolean(). From e1968e09c43d6003afba93bb8fc18df40af57842 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Thu, 10 Nov 2016 16:40:40 -0800 Subject: [PATCH 13/15] Convert orswot to use maps --- c_src/Makefile | 1 + src/riak_dt_orswot.erl | 117 +++++++++++++++++++++++++---------------- 2 files changed, 73 insertions(+), 45 deletions(-) diff --git a/c_src/Makefile b/c_src/Makefile index 028c228..8638fc0 100644 --- a/c_src/Makefile +++ b/c_src/Makefile @@ -1,5 +1,6 @@ # Based on c_src.mk from erlang.mk by Loic Hoguin +.PHONY: all CURDIR := $(shell pwd) BASEDIR := $(abspath $(CURDIR)/..) diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 46e1b14..fbd3c56 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -90,8 +90,9 @@ -endif. --export_type([orswot/0, orswot_op/0, binary_orswot/0]). +-export_type([legacy_orswot/0, orswot/0, orswot_op/0, binary_orswot/0]). +-opaque legacy_orswot() :: {riak_dt_vclock:vclock(), legacy_entries(), deferred()}. -opaque orswot() :: {riak_dt_vclock:vclock(), entries(), deferred()}. %% Only removes can be deferred, so a list of members to be removed %% per context. @@ -108,7 +109,8 @@ %% a dict of member() -> minimal_clock() mappings. The %% `minimal_clock()' is a more effecient way of storing knowledge %% about adds / removes than a UUID per add. --type entries() :: [{member(), minimal_clock()}]. +-type legacy_entries() ::[{member(), minimal_clock()}]. +-type entries() :: #{member() => minimal_clock()}. %% a minimal clock is just the dots for the element, each dot being an %% actor and event counter for when the element was added. @@ -120,7 +122,7 @@ -spec new() -> orswot(). new() -> - {riak_dt_vclock:fresh(), orddict:new(), orddict:new()}. + {riak_dt_vclock:fresh(), maps:new(), orddict:new()}. %% @doc sets the clock in the Set to that `Clock'. Used by a %% containing Map for sub-CRDTs @@ -128,11 +130,15 @@ new() -> parent_clock(Clock, {_SetClock, Entries, Deferred}) -> {Clock, Entries, Deferred}. --spec value(orswot()) -> [member()]. +-spec value(orswot() | legacy_orswot()) -> [member()]. +value({_Clock, Entries, _Deferred}) when is_list(Entries) -> + [K || {K, _Dots} <- orddict:to_list(Entries)]; value({_Clock, Entries, _Deferred}) -> - [K || {K, _Dots} <- orddict:to_list(Entries)]. + [K || {K, _Dots} <- maps:to_list(Entries)]. --spec value(orswot_q(), orswot()) -> term(). +-spec value(orswot_q(), orswot() | legacy_orswot()) -> term(). +value(size, {_Clock, Entries, _Deferred}) when is_map(Entries) -> + maps:size(Entries); value(size, ORset) -> length(value(ORset)); value({contains, Elem}, ORset) -> @@ -140,15 +146,17 @@ value({contains, Elem}, ORset) -> %% @doc take a list of Set operations and apply them to the set. %% NOTE: either _all_ are applied, or _none_ are. --spec update(orswot_op(), actor() | dot(), orswot()) -> {ok, orswot()} | +-spec update(orswot_op(), actor() | dot(), orswot() | legacy_orswot()) -> {ok, orswot()} | precondition_error(). +update(Op, Actor, {Clock, Entries0, Deferred}) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + update(Op, Actor, {Clock, Entries1, Deferred}); update({update, Ops}, Actor, ORSet) -> apply_ops(Ops, Actor, ORSet); update({add, Elem}, Actor, ORSet) -> {ok, add_elem(Actor, ORSet, Elem)}; -update({remove, Elem}, _Actor, ORSet) -> - {_Clock, Entries, _Deferred} = ORSet, - remove_elem(orddict:find(Elem, Entries), Elem, ORSet); +update({remove, Elem}, _Actor, ORSet = {_Clock, Entries, _Deferred}) -> + remove_elem(maps:find(Elem, Entries), Elem, ORSet); update({add_all, Elems}, Actor, ORSet) -> ORSet2 = lists:foldl(fun(E, S) -> add_elem(Actor, S, E) end, @@ -160,8 +168,11 @@ update({add_all, Elems}, Actor, ORSet) -> update({remove_all, Elems}, Actor, ORSet) -> remove_all(Elems, Actor, ORSet). --spec update(orswot_op(), actor() | dot(), orswot(), riak_dt:context()) -> +-spec update(orswot_op(), actor() | dot(), legacy_orswot() | orswot(), riak_dt:context()) -> {ok, orswot()} | precondition_error(). +update(Op, Actor, {Clock, Entries0, Deferred}, Context) when is_list(Entries0) -> + Entries1 = map:from_list(Entries0), + update(Op, Actor, {Clock, Entries1, Deferred}, Context); update(Op, Actor, ORSet, undefined) -> update(Op, Actor, ORSet); update({add, Elem}, Actor, ORSet, _Ctx) -> @@ -172,14 +183,14 @@ update({remove, Elem}, _Actor, {Clock, Entries, Deferred}, Ctx) -> %% have this element, we can drop any dots it has that the %% Context has seen. Deferred2 = defer_remove(Clock, Ctx, Elem, Deferred), - case orddict:find(Elem, Entries) of + case maps:find(Elem, Entries) of {ok, ElemClock} -> ElemClock2 = riak_dt_vclock:subtract_dots(ElemClock, Ctx), case ElemClock2 of [] -> - {ok, {Clock, orddict:erase(Elem, Entries), Deferred2}}; + {ok, {Clock, maps:remove(Elem, Entries), Deferred2}}; _ -> - {ok, {Clock, orddict:store(Elem, ElemClock2, Entries), Deferred2}} + {ok, {Clock, maps:put(Elem, ElemClock2, Entries), Deferred2}} end; error -> %% Do we not have the element because we removed it @@ -260,7 +271,13 @@ remove_all([Elem | Rest], Actor, ORSet, Ctx) -> {ok, ORSet2} = update({remove, Elem}, Actor, ORSet, Ctx), remove_all(Rest, Actor, ORSet2, Ctx). --spec merge(orswot(), orswot()) -> orswot(). +-spec merge(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> orswot(). +merge({LHSClock, LHSEntries0, LHSDeferred}, RHS) when is_list(LHSEntries0) -> + LHSEntries1 = maps:from_list(LHSEntries0), + merge({LHSClock, LHSEntries1, LHSDeferred}, RHS); +merge(LHS, {RHSClock, RHSEntries0, RHSDeferred}) when is_list(RHSEntries0) -> + RHSEntries1 = maps:from_list(RHSEntries0), + merge(LHS, {RHSClock, RHSEntries1, RHSDeferred}); merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferred}=RHS) -> @@ -268,8 +285,8 @@ merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferre %% If an element is in both dicts, merge it. If it occurs in one, %% then see if its dots are dominated by the others whole set %% clock. If so, then drop it, if not, keep it. - LHSKeys = sets:from_list(orddict:fetch_keys(LHSEntries)), - RHSKeys = sets:from_list(orddict:fetch_keys(RHSEntries)), + LHSKeys = sets:from_list(maps:keys(LHSEntries)), + RHSKeys = sets:from_list(maps:keys(RHSEntries)), CommonKeys = sets:intersection(LHSKeys, RHSKeys), LHSUnique = sets:subtract(LHSKeys, CommonKeys), RHSUnique = sets:subtract(RHSKeys, CommonKeys), @@ -279,7 +296,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferre Entries = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries0), Deffered = merge_deferred(LHSDeferred, RHSDeferred), - apply_deferred(Clock, dict:to_list(Entries), Deffered). + apply_deferred(Clock, Entries, Deffered). %% @private merge the deffered operations for both sets. -spec merge_deferred(deferred(), deferred()) -> deferred(). @@ -305,18 +322,17 @@ apply_deferred(Clock, Entries, Deferred) -> %% @doc check if each element in `Entries' should be in the merged %% set. --spec merge_disjoint_keys(set(), orddict:orddict(), - riak_dt_vclock:vclock(), dict:dict()) -> dict:dict(). -merge_disjoint_keys(Keys, Entries0, SetClock, Accumulator) -> - Entries1 = dict:from_list(Entries0), +-spec merge_disjoint_keys(set(), entries(), + riak_dt_vclock:vclock(), entries()) -> entries(). +merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> sets:fold(fun(Key, Acc) -> - Dots = dict:fetch(Key, Entries1), + Dots = maps:get(Key, Entries), case riak_dt_vclock:descends(SetClock, Dots) of false -> %% Optimise the set of stored dots to %% include only those unseen NewDots = riak_dt_vclock:subtract_dots(Dots, SetClock), - dict:store(Key, NewDots, Acc); + Acc#{Key => NewDots}; true -> Acc end @@ -327,7 +343,7 @@ merge_disjoint_keys(Keys, Entries0, SetClock, Accumulator) -> %% @doc merges the minimal clocks for the common entries in both sets. -spec merge_common_keys(set(), {riak_dt_vclock:vclock(), entries(), deferred()}, {riak_dt_vclock:vclock(), entries(), deferred()}) -> - dict:dict(). + entries(). merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> %% If both sides have the same values, some dots may still need to @@ -338,8 +354,8 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, %% dominated by the other sides clock sets:fold(fun(Key, Acc) -> - V1 = orddict:fetch(Key, LHSEntries), - V2 = orddict:fetch(Key, RHSEntries), + V1 = maps:get(Key, LHSEntries), + V2 = maps:get(Key, RHSEntries), CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)), LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)), @@ -350,28 +366,39 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, %% Perfectly possible that an item in both sets should be dropped case V of [] -> - dict:erase(Key, Acc); + maps:remove(Key, Acc); _ -> - dict:store(Key, V, Acc) + maps:put(Key, V, Acc) end end, - dict:new(), + maps:new(), CommonKeys). --spec equal(orswot(), orswot()) -> boolean(). +-spec equal(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> boolean(). +equal({Clock, Entries0, Deferred}, Rhs) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + equal({Clock, Entries1, Deferred}, Rhs); +equal(Lhs, {Clock, Entries0, Deferred}) when is_list(Entries0) -> + Entries1 = maps:from_list(Entries0), + equal(Lhs, {Clock, Entries1, Deferred}); equal({Clock1, Entries1, _}, {Clock2, Entries2, _}) -> riak_dt_vclock:equal(Clock1, Clock2) andalso - orddict:fetch_keys(Entries1) == orddict:fetch_keys(Entries2) andalso + Entries1 == Entries2 andalso clocks_equal(Entries1, Entries2). --spec clocks_equal(orddict:orddict(), orddict:orddict()) -> boolean(). -clocks_equal([], _) -> +-spec clocks_equal(entries(), entries()) -> boolean(). +clocks_equal(EntriesLhs, EntriesRhs) -> + KeysLhs = maps:keys(EntriesLhs), + clocks_equal(KeysLhs, EntriesLhs, EntriesRhs). + +clocks_equal([], _, _) -> true; -clocks_equal([{Elem, Clock1} | Rest], Entries2) -> - Clock2 = orddict:fetch(Elem, Entries2), - case riak_dt_vclock:equal(Clock1, Clock2) of +clocks_equal([Elem|Rest], EntriesLhs, EntriesRhs) -> + ClockLhs = maps:get(Elem, EntriesLhs), + ClockRhs = maps:get(Elem, EntriesRhs), + case riak_dt_vclock:equal(ClockLhs, ClockRhs) of true -> - clocks_equal(Rest, Entries2); + clocks_equal(Rest, EntriesLhs, EntriesRhs); false -> false end. @@ -379,18 +406,18 @@ clocks_equal([{Elem, Clock1} | Rest], Entries2) -> %% Private -spec add_elem(actor() | dot(), orswot(), member()) -> orswot(). add_elem(Dot, {Clock, Entries, Deferred}, Elem) when is_tuple(Dot) -> - {riak_dt_vclock:merge([Clock, [Dot]]), orddict:store(Elem, [Dot], Entries), Deferred}; + {riak_dt_vclock:merge([Clock, [Dot]]), maps:put(Elem, [Dot], Entries), Deferred}; add_elem(Actor, {Clock, Entries, Deferred}, Elem) -> NewClock = riak_dt_vclock:increment(Actor, Clock), Dot = [{Actor, riak_dt_vclock:get_counter(Actor, NewClock)}], - {NewClock, orddict:store(Elem, Dot, Entries), Deferred}. + {NewClock, maps:put(Elem, Dot, Entries), Deferred}. -spec remove_elem({ok, riak_dt_vclock:vclock()} | error, - member(), {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}) -> + member(), orswot()) -> {ok, {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}} | precondition_error(). -remove_elem({ok, _VClock}, Elem, {Clock, Dict, Deferred}) -> - {ok, {Clock, orddict:erase(Elem, Dict), Deferred}}; +remove_elem({ok, _VClock}, Elem, {Clock, Entries, Deferred}) -> + {ok, {Clock, maps:remove(Elem, Entries), Deferred}}; remove_elem(_, Elem, _ORSet) -> {error, {precondition, {not_present, Elem}}}. @@ -414,9 +441,9 @@ stats(ORSWOT) -> stat(actor_count, {Clock, _Dict, _}) -> length(Clock); stat(element_count, {_Clock, Dict, _}) -> - orddict:size(Dict); + maps:size(Dict); stat(max_dot_length, {_Clock, Dict, _}) -> - orddict:fold(fun(_K, Dots, Acc) -> + maps:fold(fun(_K, Dots, Acc) -> max(length(Dots), Acc) end, 0, Dict); stat(deferred_length, {_Clock, _Dict, Deferred}) -> From 85a9eabec8aca58af2d314a20ae9d43433e58237 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 16 Nov 2016 00:20:13 -0800 Subject: [PATCH 14/15] More performance optimizations --- src/riak_dt_orswot.erl | 78 ++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index fbd3c56..cc7e7fe 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -120,6 +120,8 @@ -type precondition_error() :: {error, {precondition ,{not_present, member()}}}. +-define(EMPTY_ORSWOT, {[],#{},[]}). + -spec new() -> orswot(). new() -> {riak_dt_vclock:fresh(), maps:new(), orddict:new()}. @@ -280,23 +282,25 @@ merge(LHS, {RHSClock, RHSEntries0, RHSDeferred}) when is_list(RHSEntries0) -> merge(LHS, {RHSClock, RHSEntries1, RHSDeferred}); merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; +merge({Clock, Entries, LHSDeferred}, {Clock, Entries, RHSDeferred}) -> + Deffered = merge_deferred(LHSDeferred, RHSDeferred), + apply_deferred(Clock, Entries, Deffered); +merge(?EMPTY_ORSWOT, RHS) -> RHS; +merge(LHS, ?EMPTY_ORSWOT) -> LHS; merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferred}=RHS) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), %% If an element is in both dicts, merge it. If it occurs in one, %% then see if its dots are dominated by the others whole set %% clock. If so, then drop it, if not, keep it. - LHSKeys = sets:from_list(maps:keys(LHSEntries)), - RHSKeys = sets:from_list(maps:keys(RHSEntries)), - CommonKeys = sets:intersection(LHSKeys, RHSKeys), - LHSUnique = sets:subtract(LHSKeys, CommonKeys), - RHSUnique = sets:subtract(RHSKeys, CommonKeys), - Entries00 = merge_common_keys(CommonKeys, LHS, RHS), + LHSUnique = maps:filter(fun(Key, _Value) -> not maps:is_key(Key, RHSEntries) end, LHSEntries), + RHSUnique = maps:filter(fun(Key, _Value) -> not maps:is_key(Key, LHSEntries) end, RHSEntries), + Entries0 = merge_common_keys(LHS, RHS), - Entries0 = merge_disjoint_keys(LHSUnique, LHSEntries, RHSClock, Entries00), - Entries = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries0), + Entries1 = merge_disjoint_keys(LHSUnique, LHSEntries, RHSClock, Entries0), + Entries2 = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries1), Deffered = merge_deferred(LHSDeferred, RHSDeferred), - apply_deferred(Clock, Entries, Deffered). + apply_deferred(Clock, Entries2, Deffered). %% @private merge the deffered operations for both sets. -spec merge_deferred(deferred(), deferred()) -> deferred(). @@ -325,7 +329,7 @@ apply_deferred(Clock, Entries, Deferred) -> -spec merge_disjoint_keys(set(), entries(), riak_dt_vclock:vclock(), entries()) -> entries(). merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> - sets:fold(fun(Key, Acc) -> + maps:fold(fun(Key, _Val, Acc) -> Dots = maps:get(Key, Entries), case riak_dt_vclock:descends(SetClock, Dots) of false -> @@ -341,10 +345,10 @@ merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> Keys). %% @doc merges the minimal clocks for the common entries in both sets. --spec merge_common_keys(set(), {riak_dt_vclock:vclock(), entries(), deferred()}, +-spec merge_common_keys({riak_dt_vclock:vclock(), entries(), deferred()}, {riak_dt_vclock:vclock(), entries(), deferred()}) -> entries(). -merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> +merge_common_keys({LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) -> %% If both sides have the same values, some dots may still need to %% be shed. If LHS has dots for 'X' that RHS does _not_ have, and @@ -352,27 +356,33 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, %% dots. We only keep dots BOTH side agree on, or dots that are %% not dominated. Keep only common dots, and dots that are not %% dominated by the other sides clock - - sets:fold(fun(Key, Acc) -> - V1 = maps:get(Key, LHSEntries), - V2 = maps:get(Key, RHSEntries), - - CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(V2), CommonDots)), - LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), - RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), - %% Perfectly possible that an item in both sets should be dropped - case V of - [] -> - maps:remove(Key, Acc); - _ -> - maps:put(Key, V, Acc) - end - end, - maps:new(), - CommonKeys). + LHSKeys = maps:keys(LHSEntries), + lists:foldl( + fun(Key, Acc) -> + case maps:is_key(Key, RHSEntries) of + true -> + V1 = maps:get(Key, LHSEntries), + V2 = maps:get(Key, RHSEntries), + + CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)), + LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)), + RHSUnique = sets:to_list(sets:subtract(sets:from_list(V2), CommonDots)), + LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), + %% Perfectly possible that an item in both sets should be dropped + case V of + [] -> + maps:remove(Key, Acc); + _ -> + maps:put(Key, V, Acc) + end; + false -> + Acc + end + end, + maps:new(), + LHSKeys). -spec equal(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> boolean(). equal({Clock, Entries0, Deferred}, Rhs) when is_list(Entries0) -> @@ -479,6 +489,8 @@ from_binary(<>) -> %% =================================================================== -ifdef(TEST). +empty_orswot_test() -> + ?assertEqual(new(), ?EMPTY_ORSWOT). stat_test() -> Set = new(), {ok, Set1} = update({add, <<"foo">>}, 1, Set), From f217b94b6283cc0ebf6257d6fa2d36e7efcf831f Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Mon, 21 Nov 2016 21:49:45 -0800 Subject: [PATCH 15/15] Minor fixes --- src/riak_dt_orswot.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index cc7e7fe..c4da8bb 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -282,11 +282,11 @@ merge(LHS, {RHSClock, RHSEntries0, RHSDeferred}) when is_list(RHSEntries0) -> merge(LHS, {RHSClock, RHSEntries1, RHSDeferred}); merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; +merge(?EMPTY_ORSWOT, RHS) -> RHS; +merge(LHS, ?EMPTY_ORSWOT) -> LHS; merge({Clock, Entries, LHSDeferred}, {Clock, Entries, RHSDeferred}) -> Deffered = merge_deferred(LHSDeferred, RHSDeferred), apply_deferred(Clock, Entries, Deffered); -merge(?EMPTY_ORSWOT, RHS) -> RHS; -merge(LHS, ?EMPTY_ORSWOT) -> LHS; merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferred}=RHS) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), %% If an element is in both dicts, merge it. If it occurs in one, @@ -326,7 +326,7 @@ apply_deferred(Clock, Entries, Deferred) -> %% @doc check if each element in `Entries' should be in the merged %% set. --spec merge_disjoint_keys(set(), entries(), +-spec merge_disjoint_keys(map(), entries(), riak_dt_vclock:vclock(), entries()) -> entries(). merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) -> maps:fold(fun(Key, _Val, Acc) -> @@ -424,7 +424,7 @@ add_elem(Actor, {Clock, Entries, Deferred}, Elem) -> -spec remove_elem({ok, riak_dt_vclock:vclock()} | error, member(), orswot()) -> - {ok, {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}} | + {ok, orswot()} | precondition_error(). remove_elem({ok, _VClock}, Elem, {Clock, Entries, Deferred}) -> {ok, {Clock, maps:remove(Elem, Entries), Deferred}};