Skip to content

Commit

Permalink
Merge pull request #308 from basho/feature/rz/more-aae-ports
Browse files Browse the repository at this point in the history
More AAE ports
  • Loading branch information
rzezeski committed Feb 18, 2014
2 parents 2c9c40f + 5c21cab commit 6b31d32
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 21 deletions.
25 changes: 25 additions & 0 deletions riak_test/aae_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ confirm() ->

_ = verify_no_repair_after_restart(Cluster),

_ = verify_exchange_after_expire(Cluster),

pass.

%% @doc Create a tuple containing a riak object and the first
Expand Down Expand Up @@ -145,6 +147,29 @@ setup_index(Cluster, PBConn, YZBenchDir) ->
ok = yz_rt:wait_for_index(Cluster, ?INDEX),
ok = yz_rt:set_index(Node, ?BUCKET, ?INDEX).

%% @doc Verify that expired trees do not prevent exchange from
%% occurring (like clearing trees does).
-spec verify_exchange_after_expire([node()]) -> ok.
verify_exchange_after_expire(Cluster) ->
lager:info("Expire all trees and verify exchange still happens"),
lager:info("Set anti_entropy_build_limit to 0 so that trees can't be built"),
_ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {0, 1000}])
|| Node <- Cluster],

lager:info("Expire all trees"),
_ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, [])
|| Node <- Cluster],

%% The expire is async so just give it a moment
timer:sleep(100),

ok = yz_rt:wait_for_full_exchange_round(Cluster, now()),

lager:info("Set anti_entropy_build_limit to 100 so trees can build again"),
_ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {100, 1000}])
|| Node <- Cluster],
ok.

%% @doc Verify that Yokozuna deletes postings which have no
%% corresponding KV object.
-spec verify_removal_of_orphan_postings([node()]) -> ok.
Expand Down
31 changes: 28 additions & 3 deletions src/yz_entropy_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ cancel_exchanges() ->
clear_trees() ->
gen_server:cast(?MODULE, clear_trees).

%% @doc Expire all the trees. Expired trees can still be exchanged up
%% until the point they are rebuilt versus clearing trees which
%% destroys them and prevents exchange from occurring until the tree
%% is rebuilt which can take a long time.
%%
%% @see clear_trees/0
-spec expire_trees() -> ok.
expire_trees() ->
gen_server:cast(?MODULE, expire_trees).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -177,9 +187,15 @@ handle_call(disable, _From, S) ->
[yz_index_hashtree:stop(T) || {_,T} <- S#state.trees],
{reply, ok, S};

handle_call({set_mode, Mode}, _From, S) ->
S2 = S#state{mode=Mode},
{reply, ok, S2};
handle_call({set_mode, NewMode}, _From, S=#state{mode=CurrentMode}) ->
S2 = case {CurrentMode, NewMode} of
{automatic, manual} ->
S#state{exchange_queue=[]};
_ ->
S
end,
S3 = S2#state{mode=NewMode},
{reply, ok, S3};

handle_call(Request, From, S) ->
lager:warning("Unexpected call: ~p from ~p", [Request, From]),
Expand All @@ -198,6 +214,10 @@ handle_cast(clear_trees, S) ->
clear_all_trees(S#state.trees),
{noreply, S};

handle_cast(expire_trees, S) ->
ok = expire_all_trees(S#state.trees),
{noreply, S};

handle_cast(_Msg, S) ->
lager:warning("Unexpected cast: ~p", [_Msg]),
{noreply, S}.
Expand Down Expand Up @@ -264,6 +284,11 @@ clear_all_exchanges(Exchanges) ->
clear_all_trees(Trees) ->
[yz_index_hashtree:clear(TPid) || {_, TPid} <- Trees].

-spec expire_all_trees(trees()) -> ok.
expire_all_trees(Trees) ->
_ = [yz_index_hashtree:expire(TPid) || {_, TPid} <- Trees],
ok.

schedule_reset_build_tokens() ->
{_, Reset} = ?YZ_ENTROPY_BUILD_LIMIT,
erlang:send_after(Reset, self(), reset_build_tokens).
Expand Down
91 changes: 73 additions & 18 deletions src/yz_index_hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

-record(state, {index,
built,
expired :: boolean(),
lock :: undefined | reference(),
path,
build_time,
Expand Down Expand Up @@ -106,6 +107,10 @@ poke(Tree) ->
clear(Tree) ->
gen_server:cast(Tree, clear).

%% @doc Expire the tree.
expire(Tree) ->
gen_server:call(Tree, expire, infinity).

%% @doc Terminate the `Tree'.
stop(Tree) ->
gen_server:cast(Tree, stop).
Expand Down Expand Up @@ -140,6 +145,7 @@ init([Index, RPs]) ->
S = #state{index=Index,
trees=orddict:new(),
built=false,
expired=false,
path=Path},
S2 = init_trees(RPs, S),

Expand Down Expand Up @@ -191,6 +197,10 @@ handle_call(destroy, _From, S) ->
S2 = destroy_trees(S),
{stop, normal, ok, S2};

handle_call(expire, _From, S) ->
S2 = S#state{expired=true},
{reply, ok, S2};

handle_call(_Request, _From, S) ->
Reply = ok,
{reply, Reply, S}.
Expand Down Expand Up @@ -268,7 +278,7 @@ init_trees(RPs, S) ->
S2 = lists:foldl(fun(Id, SAcc) ->
do_new_tree(Id, SAcc)
end, S, RPs),
S2#state{built=false, closed=false}.
S2#state{built=false, closed=false, expired=false}.

-spec load_built(state()) -> boolean().
load_built(#state{trees=Trees}) ->
Expand Down Expand Up @@ -361,7 +371,7 @@ do_build_finished(S=#state{index=Index, built=_Pid}) ->
hashtree:write_meta(<<"built">>, <<1>>, Tree0),
hashtree:write_meta(<<"build_time">>, term_to_binary(BuildTime), Tree0),
yz_kv:update_aae_tree_stats(Index, BuildTime),
S#state{built=true, build_time=BuildTime}.
S#state{built=true, build_time=BuildTime, expired=false}.

-spec do_insert({p(),n()}, binary(), binary(), proplist(), state()) -> state().
do_insert(Id, Key, Hash, Opts, S=#state{trees=Trees}) ->
Expand Down Expand Up @@ -459,25 +469,31 @@ do_compare(Id, Remote, AccFun, Acc, From, S) ->
get_index_n(BKey) ->
riak_kv_util:get_index_n(BKey).

-spec do_poke(state()) -> state().
do_poke(S) ->
maybe_build(maybe_clear(S)).
S1 = maybe_rebuild(maybe_expire(S)),
S2 = maybe_build(S1),
S2.

maybe_clear(S=#state{lock=undefined, built=true}) ->
-spec maybe_expire(state()) -> state().
maybe_expire(S=#state{lock=undefined, built=true}) ->
Diff = timer:now_diff(os:timestamp(), S#state.build_time),
case Diff > (?YZ_ENTROPY_EXPIRE * 1000) of
true -> clear_tree(S);
Expire = ?YZ_ENTROPY_EXPIRE,
case (Expire /= never) andalso (Diff > (Expire * 1000)) of
true -> S#state{expired=true};
false -> S
end;

maybe_clear(S) ->
maybe_expire(S) ->
S.

-spec clear_tree(state()) -> state().
clear_tree(S=#state{index=Index}) ->
lager:debug("Clearing tree ~p", [S#state.index]),
S2 = destroy_trees(S),
IndexN = riak_kv_util:responsible_preflists(Index),
S3 = init_trees(IndexN, S2#state{trees=orddict:new()}),
S3#state{built=false}.
S3#state{built=false, expired=false}.

destroy_trees(S) ->
S2 = close_trees(S),
Expand Down Expand Up @@ -514,23 +530,62 @@ close_trees(S=#state{trees=Trees, closed=false}) ->
close_trees(S) ->
S.

build_or_rehash(Self, S=#state{index=Index, trees=Trees}) ->
-spec build_or_rehash(pid(), state()) -> ok.
build_or_rehash(Tree, S) ->
Type = case load_built(S) of
false -> build;
true -> rehash
end,
Lock = yz_entropy_mgr:get_lock(Type),
case {Lock, Type} of
{ok, build} ->
Locked = get_all_locks(Type, self()),
build_or_rehash(Tree, Locked, Type, S).

-spec build_or_rehash(pid(), boolean(), build | rehash, state()) -> ok.
build_or_rehash(Tree, Locked, Type, #state{index=Index, trees=Trees}) ->
case {Locked, Type} of
{true, build} ->
lager:debug("Starting build: ~p", [Index]),
fold_keys(Index, Self),
fold_keys(Index, Tree),
lager:debug("Finished build: ~p", [Index]),
gen_server:cast(Self, build_finished);
{ok, rehash} ->
gen_server:cast(Tree, build_finished);
{true, rehash} ->
lager:debug("Starting rehash: ~p", [Index]),
_ = [hashtree:rehash_tree(T) || {_,T} <- Trees],
lager:debug("Finished rehash: ~p", [Index]),
gen_server:cast(Self, build_finished);
{_Error, _} ->
gen_server:cast(Self, build_failed)
gen_server:cast(Tree, build_finished);
{_, _} ->
gen_server:cast(Tree, build_failed)
end.

maybe_rebuild(S=#state{lock=undefined, built=true, expired=true}) ->
Tree = self(),
Pid = spawn_link(fun() ->
receive
{lock, Locked, S2} ->
build_or_rehash(Tree, Locked, build, S2);
stop ->
ok
end
end),
Locked = get_all_locks(build, Pid),
case Locked of
true ->
S2 = clear_tree(S),
Pid ! {lock, Locked, S2},
S2#state{built=Pid};
_ ->
Pid ! stop,
S
end;
maybe_rebuild(S) ->
S.

-spec get_all_locks(build | rehash, pid()) -> boolean().
get_all_locks(Type, Pid) ->
%% NOTE: Yokozuna diverges from KV here. KV has notion of vnode
%% fold to make sure handoff/aae don't fight each other. Yokozuna
%% has no vnodes. It would probably be a good idea to adda lock
%% around Solr so that mutliple tree builds don't fight for the
%% file page cache but the bg manager stuff is kind of convoluted
%% and there isn't time to figure this all out for 2.0. Thus,
%% Yokozuna will not bother with the Solr lock for now.
ok == yz_entropy_mgr:get_lock(Type, Pid).

0 comments on commit 6b31d32

Please sign in to comment.