From d44770afb029e81b085277ff97fa74c68bcb1882 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 23 Sep 2015 05:30:25 -0700 Subject: [PATCH 1/3] Add code to allow responses from kmodify - pass 1 --- include/riak_ensemble_types.hrl | 1 + src/riak_ensemble_peer.erl | 74 +++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/include/riak_ensemble_types.hrl b/include/riak_ensemble_types.hrl index 35b1fe0..3929f46 100644 --- a/include/riak_ensemble_types.hrl +++ b/include/riak_ensemble_types.hrl @@ -6,6 +6,7 @@ -type peer_change() :: term(). %% FIXME -type change_error() :: already_member | not_member. -type std_reply() :: timeout | failed | unavailable | nack | {ok, term()}. +-type extended_reply() :: std_reply() | {error, term()}. -type maybe_pid() :: pid() | undefined. -type peer_pids() :: [{peer_id(), maybe_pid()}]. -type peer_reply() :: {peer_id(), term()}. diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 0918db6..49f4946 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -29,7 +29,7 @@ -export([start_link/4, start/4]). -export([join/2, join/3, update_members/3, get_leader/1, backend_pong/1]). -export([kget/4, kget/5, kupdate/6, kput_once/5, kover/5, kmodify/6, kdelete/4, - ksafe_delete/5, obj_value/2, obj_value/3]). + ksafe_delete/5, obj_value/2, obj_value/3, kmodify_extended/6]). -export([setup/2]). -export([probe/2, election/2, prepare/2, leading/2, following/2, probe/3, election/3, prepare/3, leading/3, following/3]). @@ -44,7 +44,7 @@ get_info/1, stable_views/2, tree_info/1]). %% Exported internal callback functions --export([do_kupdate/4, do_kput_once/4, do_kmodify/4]). +-export([do_kupdate/4, do_kput_once/4, do_kmodify/4, do_kmodify_extended/4]). -compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm}]}). @@ -113,6 +113,7 @@ -type maybe_peer_id() :: undefined | peer_id(). -type modify_fun() :: fun() | {module(), atom(), term()}. + -record(state, {id :: peer_id(), ensemble :: ensemble_id(), ets :: ets:tid(), @@ -227,6 +228,7 @@ kget(Node, Target, Key, Timeout, Opts) -> ?OUT("get(~p): ~p~n", [Key, Result]), Result. + -spec kupdate(node(), target(), key(), obj(), term(), timeout()) -> std_reply(). kupdate(Node, Target, Key, Current, New, Timeout) -> F = fun ?MODULE:do_kupdate/4, @@ -291,6 +293,31 @@ do_kmodify(Obj, NextSeq, State, [ModFun, Default]) -> _ -> {ok, set_obj(value, New, Obj, State)} end. +kmodify_extended(Node, Target, Key, ModFun, Default, Timeout) -> + F = fun ?MODULE:do_kmodify_extended/4, + Result = riak_ensemble_router:sync_send_event(Node, Target, {put, Key, F, [ModFun, Default]}, Timeout), + ?OUT("kmodify(~p): ~p~n", [Key, Result]), + Result. + +do_kmodify_extended(Obj, NextSeq, State, [ModFun, Default]) -> + Value = get_value(Obj, Default, State), + Vsn = {epoch(State), NextSeq}, + New = case ModFun of + {Mod, Fun, Args} -> + Mod:Fun(Vsn, Value, Args); + _ -> + ModFun(Vsn, Value) + end, + case New of + failed -> + failed; + {failed, _} -> + New; + {reply, Reply, NewValue} -> + {reply, Reply, set_obj(value, NewValue, Obj, State)}; + _ -> + {ok, set_obj(value, New, Obj, State)} + end. -spec kdelete(node(), target(), key(), timeout()) -> std_reply(). kdelete(Node, Target, Key, Timeout) -> @@ -1326,7 +1353,9 @@ send_reply(From, Reply) -> failed -> ok; unavailable -> ok; nack -> ok; - {ok,_} -> ok + {ok,_} -> ok; + {reply, _} -> ok; + {failed, _} -> ok end, gen_fsm:reply(From, Reply), ok. @@ -1365,11 +1394,35 @@ do_put_fsm(Key, Fun, Args, From, Self, KnownHash, State) -> end end. +do_modify_fsm(Key = {{<<"cp">>, <<"bucket">>}, _}, Current, Fun, Args, From, State=#state{self=Self}) -> + io:format("New Code path (3)~n"), + case modify_key(Key, Current, Fun, Args, State) of + {ok, New, _State2} -> + send_reply(From, {ok, New}); + {reply, Message, _State2} -> + io:format("New Code path (3!!)~n"), + send_reply(From, {reply, Message}); + {failed, Message, _State2} -> + send_reply(From, {failed, Message}); + {corrupted, _State2} -> + send_reply(From, failed), + gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + {precondition, _State2} -> + send_reply(From, failed); + {failed, _State2} -> + gen_fsm:sync_send_event(Self, request_failed, infinity), + send_reply(From, timeout) + end; %% -spec do_modify_fsm(_,_,fun((_,_) -> any()),{_,_},state()) -> ok. do_modify_fsm(Key, Current, Fun, Args, From, State=#state{self=Self}) -> + %io:format("running modify FSM: ~p~n", [modify_key(Key, Current, Fun, Args, State)]), case modify_key(Key, Current, Fun, Args, State) of {ok, New, _State2} -> send_reply(From, {ok, New}); + {reply, Message, _State2} -> + send_reply(From, {reply, Message}); + {failed, Message, _State2} -> + send_reply(From, {failed, Message}); {corrupted, _State2} -> send_reply(From, failed), gen_fsm:sync_send_event(Self, tree_corrupted, infinity); @@ -1555,6 +1608,7 @@ modify_key(Key, Current, Fun, Args, State) -> end, case FunResult of {ok, New} -> + io:format("Doing put of new obj: ~p~n", [New]), case put_obj(Key, New, Seq, State) of {ok, Result, State2} -> {ok, Result, State2}; @@ -1564,7 +1618,19 @@ modify_key(Key, Current, Fun, Args, State) -> {failed, State2} end; failed -> - {precondition, State} + {precondition, State}; + {reply, Message, New} -> + io:format("Doing put of new obj (2): ~p~n", [New]), + case put_obj(Key, New, Seq, State) of + {ok, _Result, State2} -> + {reply, Message, State2}; + {corrupted, State2} -> + {corrupted, State2}; + {failed, State2} -> + {failed, State2} + end; + {failed, Message} -> + {failed, Message, State} end. -spec get_latest_obj(_,_,_,state()) -> {ok, obj(), _, state()} | {failed, state()}. From a1a79e9c17f8e344ad92b117e61b2b3b3e7eb77a Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 23 Sep 2015 05:34:44 -0700 Subject: [PATCH 2/3] Cleanup extra logging --- include/riak_ensemble_types.hrl | 1 - src/riak_ensemble_peer.erl | 47 ++------------------------------- 2 files changed, 2 insertions(+), 46 deletions(-) diff --git a/include/riak_ensemble_types.hrl b/include/riak_ensemble_types.hrl index 3929f46..35b1fe0 100644 --- a/include/riak_ensemble_types.hrl +++ b/include/riak_ensemble_types.hrl @@ -6,7 +6,6 @@ -type peer_change() :: term(). %% FIXME -type change_error() :: already_member | not_member. -type std_reply() :: timeout | failed | unavailable | nack | {ok, term()}. --type extended_reply() :: std_reply() | {error, term()}. -type maybe_pid() :: pid() | undefined. -type peer_pids() :: [{peer_id(), maybe_pid()}]. -type peer_reply() :: {peer_id(), term()}. diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 49f4946..7f1255c 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -29,7 +29,7 @@ -export([start_link/4, start/4]). -export([join/2, join/3, update_members/3, get_leader/1, backend_pong/1]). -export([kget/4, kget/5, kupdate/6, kput_once/5, kover/5, kmodify/6, kdelete/4, - ksafe_delete/5, obj_value/2, obj_value/3, kmodify_extended/6]). + ksafe_delete/5, obj_value/2, obj_value/3]). -export([setup/2]). -export([probe/2, election/2, prepare/2, leading/2, following/2, probe/3, election/3, prepare/3, leading/3, following/3]). @@ -44,7 +44,7 @@ get_info/1, stable_views/2, tree_info/1]). %% Exported internal callback functions --export([do_kupdate/4, do_kput_once/4, do_kmodify/4, do_kmodify_extended/4]). +-export([do_kupdate/4, do_kput_once/4, do_kmodify/4]). -compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm}]}). @@ -279,27 +279,6 @@ kmodify(Node, Target, Key, ModFun, Default, Timeout) -> Result. do_kmodify(Obj, NextSeq, State, [ModFun, Default]) -> - Value = get_value(Obj, Default, State), - Vsn = {epoch(State), NextSeq}, - New = case ModFun of - {Mod, Fun, Args} -> - Mod:Fun(Vsn, Value, Args); - _ -> - ModFun(Vsn, Value) - end, - case New of - failed -> - failed; - _ -> - {ok, set_obj(value, New, Obj, State)} - end. -kmodify_extended(Node, Target, Key, ModFun, Default, Timeout) -> - F = fun ?MODULE:do_kmodify_extended/4, - Result = riak_ensemble_router:sync_send_event(Node, Target, {put, Key, F, [ModFun, Default]}, Timeout), - ?OUT("kmodify(~p): ~p~n", [Key, Result]), - Result. - -do_kmodify_extended(Obj, NextSeq, State, [ModFun, Default]) -> Value = get_value(Obj, Default, State), Vsn = {epoch(State), NextSeq}, New = case ModFun of @@ -1394,28 +1373,8 @@ do_put_fsm(Key, Fun, Args, From, Self, KnownHash, State) -> end end. -do_modify_fsm(Key = {{<<"cp">>, <<"bucket">>}, _}, Current, Fun, Args, From, State=#state{self=Self}) -> - io:format("New Code path (3)~n"), - case modify_key(Key, Current, Fun, Args, State) of - {ok, New, _State2} -> - send_reply(From, {ok, New}); - {reply, Message, _State2} -> - io:format("New Code path (3!!)~n"), - send_reply(From, {reply, Message}); - {failed, Message, _State2} -> - send_reply(From, {failed, Message}); - {corrupted, _State2} -> - send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); - {precondition, _State2} -> - send_reply(From, failed); - {failed, _State2} -> - gen_fsm:sync_send_event(Self, request_failed, infinity), - send_reply(From, timeout) - end; %% -spec do_modify_fsm(_,_,fun((_,_) -> any()),{_,_},state()) -> ok. do_modify_fsm(Key, Current, Fun, Args, From, State=#state{self=Self}) -> - %io:format("running modify FSM: ~p~n", [modify_key(Key, Current, Fun, Args, State)]), case modify_key(Key, Current, Fun, Args, State) of {ok, New, _State2} -> send_reply(From, {ok, New}); @@ -1608,7 +1567,6 @@ modify_key(Key, Current, Fun, Args, State) -> end, case FunResult of {ok, New} -> - io:format("Doing put of new obj: ~p~n", [New]), case put_obj(Key, New, Seq, State) of {ok, Result, State2} -> {ok, Result, State2}; @@ -1620,7 +1578,6 @@ modify_key(Key, Current, Fun, Args, State) -> failed -> {precondition, State}; {reply, Message, New} -> - io:format("Doing put of new obj (2): ~p~n", [New]), case put_obj(Key, New, Seq, State) of {ok, _Result, State2} -> {reply, Message, State2}; From 8a6af58bd2b07a2ea8655d906511def80adcaf24 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 23 Sep 2015 13:21:10 -0700 Subject: [PATCH 3/3] Simplify the kmodify code to not have the 'fail' code, and in addition, allow for selective modification based on the result of the callback --- src/riak_ensemble_peer.erl | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 7f1255c..f769f34 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -290,10 +290,10 @@ do_kmodify(Obj, NextSeq, State, [ModFun, Default]) -> case New of failed -> failed; - {failed, _} -> - New; {reply, Reply, NewValue} -> {reply, Reply, set_obj(value, NewValue, Obj, State)}; + {reply_noreplicate, Reply} -> + {reply_noreplicate, Reply}; _ -> {ok, set_obj(value, New, Obj, State)} end. @@ -1333,8 +1333,7 @@ send_reply(From, Reply) -> unavailable -> ok; nack -> ok; {ok,_} -> ok; - {reply, _} -> ok; - {failed, _} -> ok + {reply, _} -> ok end, gen_fsm:reply(From, Reply), ok. @@ -1380,8 +1379,6 @@ do_modify_fsm(Key, Current, Fun, Args, From, State=#state{self=Self}) -> send_reply(From, {ok, New}); {reply, Message, _State2} -> send_reply(From, {reply, Message}); - {failed, Message, _State2} -> - send_reply(From, {failed, Message}); {corrupted, _State2} -> send_reply(From, failed), gen_fsm:sync_send_event(Self, tree_corrupted, infinity); @@ -1586,8 +1583,9 @@ modify_key(Key, Current, Fun, Args, State) -> {failed, State2} -> {failed, State2} end; - {failed, Message} -> - {failed, Message, State} + {reply_noreplicate, Message} -> + {reply, Message, State} + end. -spec get_latest_obj(_,_,_,state()) -> {ok, obj(), _, state()} | {failed, state()}.