diff --git a/riak_test/aae_test.erl b/riak_test/aae_test.erl index f2e93965..6eafa0f1 100644 --- a/riak_test/aae_test.erl +++ b/riak_test/aae_test.erl @@ -80,6 +80,8 @@ confirm() -> _ = verify_no_repair_after_restart(Cluster), + _ = verify_exchange_after_expire(Cluster), + pass. %% @doc Create a tuple containing a riak object and the first @@ -145,6 +147,29 @@ setup_index(Cluster, PBConn, YZBenchDir) -> ok = yz_rt:wait_for_index(Cluster, ?INDEX), ok = yz_rt:set_index(Node, ?BUCKET, ?INDEX). +%% @doc Verify that expired trees do not prevent exchange from +%% occurring (like clearing trees does). +-spec verify_exchange_after_expire([node()]) -> ok. +verify_exchange_after_expire(Cluster) -> + lager:info("Expire all trees and verify exchange still happens"), + lager:info("Set anti_entropy_build_limit to 0 so that trees can't be built"), + _ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {0, 1000}]) + || Node <- Cluster], + + lager:info("Expire all trees"), + _ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, []) + || Node <- Cluster], + + %% The expire is async so just give it a moment + timer:sleep(100), + + ok = yz_rt:wait_for_full_exchange_round(Cluster, now()), + + lager:info("Set anti_entropy_build_limit to 100 so trees can build again"), + _ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {100, 1000}]) + || Node <- Cluster], + ok. + %% @doc Verify that Yokozuna deletes postings which have no %% corresponding KV object. -spec verify_removal_of_orphan_postings([node()]) -> ok. diff --git a/src/yz_entropy_mgr.erl b/src/yz_entropy_mgr.erl index 5530649c..8bb43864 100644 --- a/src/yz_entropy_mgr.erl +++ b/src/yz_entropy_mgr.erl @@ -125,6 +125,16 @@ cancel_exchanges() -> clear_trees() -> gen_server:cast(?MODULE, clear_trees). +%% @doc Expire all the trees. Expired trees can still be exchanged up +%% until the point they are rebuilt versus clearing trees which +%% destroys them and prevents exchange from occurring until the tree +%% is rebuilt which can take a long time. +%% +%% @see clear_trees/0 +-spec expire_trees() -> ok. +expire_trees() -> + gen_server:cast(?MODULE, expire_trees). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -177,9 +187,15 @@ handle_call(disable, _From, S) -> [yz_index_hashtree:stop(T) || {_,T} <- S#state.trees], {reply, ok, S}; -handle_call({set_mode, Mode}, _From, S) -> - S2 = S#state{mode=Mode}, - {reply, ok, S2}; +handle_call({set_mode, NewMode}, _From, S=#state{mode=CurrentMode}) -> + S2 = case {CurrentMode, NewMode} of + {automatic, manual} -> + S#state{exchange_queue=[]}; + _ -> + S + end, + S3 = S2#state{mode=NewMode}, + {reply, ok, S3}; handle_call(Request, From, S) -> lager:warning("Unexpected call: ~p from ~p", [Request, From]), @@ -198,6 +214,10 @@ handle_cast(clear_trees, S) -> clear_all_trees(S#state.trees), {noreply, S}; +handle_cast(expire_trees, S) -> + ok = expire_all_trees(S#state.trees), + {noreply, S}; + handle_cast(_Msg, S) -> lager:warning("Unexpected cast: ~p", [_Msg]), {noreply, S}. @@ -264,6 +284,11 @@ clear_all_exchanges(Exchanges) -> clear_all_trees(Trees) -> [yz_index_hashtree:clear(TPid) || {_, TPid} <- Trees]. +-spec expire_all_trees(trees()) -> ok. +expire_all_trees(Trees) -> + _ = [yz_index_hashtree:expire(TPid) || {_, TPid} <- Trees], + ok. + schedule_reset_build_tokens() -> {_, Reset} = ?YZ_ENTROPY_BUILD_LIMIT, erlang:send_after(Reset, self(), reset_build_tokens). diff --git a/src/yz_index_hashtree.erl b/src/yz_index_hashtree.erl index d95462a2..b0fd2d92 100644 --- a/src/yz_index_hashtree.erl +++ b/src/yz_index_hashtree.erl @@ -28,6 +28,7 @@ -record(state, {index, built, + expired :: boolean(), lock :: undefined | reference(), path, build_time, @@ -106,6 +107,10 @@ poke(Tree) -> clear(Tree) -> gen_server:cast(Tree, clear). +%% @doc Expire the tree. +expire(Tree) -> + gen_server:call(Tree, expire, infinity). + %% @doc Terminate the `Tree'. stop(Tree) -> gen_server:cast(Tree, stop). @@ -140,6 +145,7 @@ init([Index, RPs]) -> S = #state{index=Index, trees=orddict:new(), built=false, + expired=false, path=Path}, S2 = init_trees(RPs, S), @@ -191,6 +197,10 @@ handle_call(destroy, _From, S) -> S2 = destroy_trees(S), {stop, normal, ok, S2}; +handle_call(expire, _From, S) -> + S2 = S#state{expired=true}, + {reply, ok, S2}; + handle_call(_Request, _From, S) -> Reply = ok, {reply, Reply, S}. @@ -268,7 +278,7 @@ init_trees(RPs, S) -> S2 = lists:foldl(fun(Id, SAcc) -> do_new_tree(Id, SAcc) end, S, RPs), - S2#state{built=false, closed=false}. + S2#state{built=false, closed=false, expired=false}. -spec load_built(state()) -> boolean(). load_built(#state{trees=Trees}) -> @@ -361,7 +371,7 @@ do_build_finished(S=#state{index=Index, built=_Pid}) -> hashtree:write_meta(<<"built">>, <<1>>, Tree0), hashtree:write_meta(<<"build_time">>, term_to_binary(BuildTime), Tree0), yz_kv:update_aae_tree_stats(Index, BuildTime), - S#state{built=true, build_time=BuildTime}. + S#state{built=true, build_time=BuildTime, expired=false}. -spec do_insert({p(),n()}, binary(), binary(), proplist(), state()) -> state(). do_insert(Id, Key, Hash, Opts, S=#state{trees=Trees}) -> @@ -459,25 +469,31 @@ do_compare(Id, Remote, AccFun, Acc, From, S) -> get_index_n(BKey) -> riak_kv_util:get_index_n(BKey). +-spec do_poke(state()) -> state(). do_poke(S) -> - maybe_build(maybe_clear(S)). + S1 = maybe_rebuild(maybe_expire(S)), + S2 = maybe_build(S1), + S2. -maybe_clear(S=#state{lock=undefined, built=true}) -> +-spec maybe_expire(state()) -> state(). +maybe_expire(S=#state{lock=undefined, built=true}) -> Diff = timer:now_diff(os:timestamp(), S#state.build_time), - case Diff > (?YZ_ENTROPY_EXPIRE * 1000) of - true -> clear_tree(S); + Expire = ?YZ_ENTROPY_EXPIRE, + case (Expire /= never) andalso (Diff > (Expire * 1000)) of + true -> S#state{expired=true}; false -> S end; -maybe_clear(S) -> +maybe_expire(S) -> S. +-spec clear_tree(state()) -> state(). clear_tree(S=#state{index=Index}) -> lager:debug("Clearing tree ~p", [S#state.index]), S2 = destroy_trees(S), IndexN = riak_kv_util:responsible_preflists(Index), S3 = init_trees(IndexN, S2#state{trees=orddict:new()}), - S3#state{built=false}. + S3#state{built=false, expired=false}. destroy_trees(S) -> S2 = close_trees(S), @@ -514,23 +530,62 @@ close_trees(S=#state{trees=Trees, closed=false}) -> close_trees(S) -> S. -build_or_rehash(Self, S=#state{index=Index, trees=Trees}) -> +-spec build_or_rehash(pid(), state()) -> ok. +build_or_rehash(Tree, S) -> Type = case load_built(S) of false -> build; true -> rehash end, - Lock = yz_entropy_mgr:get_lock(Type), - case {Lock, Type} of - {ok, build} -> + Locked = get_all_locks(Type, self()), + build_or_rehash(Tree, Locked, Type, S). + +-spec build_or_rehash(pid(), boolean(), build | rehash, state()) -> ok. +build_or_rehash(Tree, Locked, Type, #state{index=Index, trees=Trees}) -> + case {Locked, Type} of + {true, build} -> lager:debug("Starting build: ~p", [Index]), - fold_keys(Index, Self), + fold_keys(Index, Tree), lager:debug("Finished build: ~p", [Index]), - gen_server:cast(Self, build_finished); - {ok, rehash} -> + gen_server:cast(Tree, build_finished); + {true, rehash} -> lager:debug("Starting rehash: ~p", [Index]), _ = [hashtree:rehash_tree(T) || {_,T} <- Trees], lager:debug("Finished rehash: ~p", [Index]), - gen_server:cast(Self, build_finished); - {_Error, _} -> - gen_server:cast(Self, build_failed) + gen_server:cast(Tree, build_finished); + {_, _} -> + gen_server:cast(Tree, build_failed) end. + +maybe_rebuild(S=#state{lock=undefined, built=true, expired=true}) -> + Tree = self(), + Pid = spawn_link(fun() -> + receive + {lock, Locked, S2} -> + build_or_rehash(Tree, Locked, build, S2); + stop -> + ok + end + end), + Locked = get_all_locks(build, Pid), + case Locked of + true -> + S2 = clear_tree(S), + Pid ! {lock, Locked, S2}, + S2#state{built=Pid}; + _ -> + Pid ! stop, + S + end; +maybe_rebuild(S) -> + S. + +-spec get_all_locks(build | rehash, pid()) -> boolean(). +get_all_locks(Type, Pid) -> + %% NOTE: Yokozuna diverges from KV here. KV has notion of vnode + %% fold to make sure handoff/aae don't fight each other. Yokozuna + %% has no vnodes. It would probably be a good idea to adda lock + %% around Solr so that mutliple tree builds don't fight for the + %% file page cache but the bg manager stuff is kind of convoluted + %% and there isn't time to figure this all out for 2.0. Thus, + %% Yokozuna will not bother with the Solr lock for now. + ok == yz_entropy_mgr:get_lock(Type, Pid).