diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 0918db6..f769f34 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -113,6 +113,7 @@ -type maybe_peer_id() :: undefined | peer_id(). -type modify_fun() :: fun() | {module(), atom(), term()}. + -record(state, {id :: peer_id(), ensemble :: ensemble_id(), ets :: ets:tid(), @@ -227,6 +228,7 @@ kget(Node, Target, Key, Timeout, Opts) -> ?OUT("get(~p): ~p~n", [Key, Result]), Result. + -spec kupdate(node(), target(), key(), obj(), term(), timeout()) -> std_reply(). kupdate(Node, Target, Key, Current, New, Timeout) -> F = fun ?MODULE:do_kupdate/4, @@ -288,6 +290,10 @@ do_kmodify(Obj, NextSeq, State, [ModFun, Default]) -> case New of failed -> failed; + {reply, Reply, NewValue} -> + {reply, Reply, set_obj(value, NewValue, Obj, State)}; + {reply_noreplicate, Reply} -> + {reply_noreplicate, Reply}; _ -> {ok, set_obj(value, New, Obj, State)} end. @@ -1326,7 +1332,8 @@ send_reply(From, Reply) -> failed -> ok; unavailable -> ok; nack -> ok; - {ok,_} -> ok + {ok,_} -> ok; + {reply, _} -> ok end, gen_fsm:reply(From, Reply), ok. @@ -1370,6 +1377,8 @@ do_modify_fsm(Key, Current, Fun, Args, From, State=#state{self=Self}) -> case modify_key(Key, Current, Fun, Args, State) of {ok, New, _State2} -> send_reply(From, {ok, New}); + {reply, Message, _State2} -> + send_reply(From, {reply, Message}); {corrupted, _State2} -> send_reply(From, failed), gen_fsm:sync_send_event(Self, tree_corrupted, infinity); @@ -1564,7 +1573,19 @@ modify_key(Key, Current, Fun, Args, State) -> {failed, State2} end; failed -> - {precondition, State} + {precondition, State}; + {reply, Message, New} -> + case put_obj(Key, New, Seq, State) of + {ok, _Result, State2} -> + {reply, Message, State2}; + {corrupted, State2} -> + {corrupted, State2}; + {failed, State2} -> + {failed, State2} + end; + {reply_noreplicate, Message} -> + {reply, Message, State} + end. -spec get_latest_obj(_,_,_,state()) -> {ok, obj(), _, state()} | {failed, state()}.