From fd500bbebfdba8228decbdac40b7cdac5ed7d8c3 Mon Sep 17 00:00:00 2001 From: rzezeski Date: Wed, 12 Feb 2014 04:20:47 +0000 Subject: [PATCH 1/6] Allow hashtrees to never expire Allow the value of `never` so that hashtrees never expire. --- src/yz_index_hashtree.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yz_index_hashtree.erl b/src/yz_index_hashtree.erl index d95462a2..a46d3d29 100644 --- a/src/yz_index_hashtree.erl +++ b/src/yz_index_hashtree.erl @@ -464,7 +464,8 @@ do_poke(S) -> maybe_clear(S=#state{lock=undefined, built=true}) -> Diff = timer:now_diff(os:timestamp(), S#state.build_time), - case Diff > (?YZ_ENTROPY_EXPIRE * 1000) of + Expire = ?YZ_ENTROPY_EXPIRE, + case (Expire /= never) andalso (Diff > (Expire * 1000)) of true -> clear_tree(S); false -> S end; From 3e8d851762e19f1424749ff37bdc2c68cc92badd Mon Sep 17 00:00:00 2001 From: rzezeski Date: Wed, 12 Feb 2014 17:27:42 +0000 Subject: [PATCH 2/6] Clear AAE exchange queue when switching to manual mode When switching to manual mode make sure to clear any pending exchanges in the queue. --- src/yz_entropy_mgr.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/yz_entropy_mgr.erl b/src/yz_entropy_mgr.erl index 5530649c..984f1b59 100644 --- a/src/yz_entropy_mgr.erl +++ b/src/yz_entropy_mgr.erl @@ -177,9 +177,15 @@ handle_call(disable, _From, S) -> [yz_index_hashtree:stop(T) || {_,T} <- S#state.trees], {reply, ok, S}; -handle_call({set_mode, Mode}, _From, S) -> - S2 = S#state{mode=Mode}, - {reply, ok, S2}; +handle_call({set_mode, NewMode}, _From, S=#state{mode=CurrentMode}) -> + S2 = case {CurrentMode, NewMode} of + {automatic, manual} -> + S#state{exchange_queue=[]}; + _ -> + S + end, + S3 = S2#state{mode=NewMode}, + {reply, ok, S3}; handle_call(Request, From, S) -> lager:warning("Unexpected call: ~p from ~p", [Request, From]), From 7586f01ea103c084c15cf0e3050eada32a1c36f7 Mon Sep 17 00:00:00 2001 From: rzezeski Date: Thu, 13 Feb 2014 17:43:13 +0000 Subject: [PATCH 3/6] Change AAE to no longer delete trees until ready to rebuild This is a port of riak_kv commit 5c346c347beb7459fe70e53275f958a68633f5fb Before this commit, AAE trees were immediately cleared after reaching their expiration time. However, trees would not be rebuilt until some time in the future depending on rebuild concurrency, lock acquisition, etc. Until rebuilt, AAE exchange was impossible. This commit changes rebuild logic to separate expiration from deletion. Once a tree reaches it's expiration time, it is marked as expired but not cleared. AAE exchange against this tree can still occur. Only after an expired tree is actually scheduled for rebuild, and successfully grabs all necessary locks, is the tree cleared. --- src/yz_index_hashtree.erl | 88 +++++++++++++++++++++++++++++++-------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/src/yz_index_hashtree.erl b/src/yz_index_hashtree.erl index a46d3d29..b0fd2d92 100644 --- a/src/yz_index_hashtree.erl +++ b/src/yz_index_hashtree.erl @@ -28,6 +28,7 @@ -record(state, {index, built, + expired :: boolean(), lock :: undefined | reference(), path, build_time, @@ -106,6 +107,10 @@ poke(Tree) -> clear(Tree) -> gen_server:cast(Tree, clear). +%% @doc Expire the tree. +expire(Tree) -> + gen_server:call(Tree, expire, infinity). + %% @doc Terminate the `Tree'. stop(Tree) -> gen_server:cast(Tree, stop). @@ -140,6 +145,7 @@ init([Index, RPs]) -> S = #state{index=Index, trees=orddict:new(), built=false, + expired=false, path=Path}, S2 = init_trees(RPs, S), @@ -191,6 +197,10 @@ handle_call(destroy, _From, S) -> S2 = destroy_trees(S), {stop, normal, ok, S2}; +handle_call(expire, _From, S) -> + S2 = S#state{expired=true}, + {reply, ok, S2}; + handle_call(_Request, _From, S) -> Reply = ok, {reply, Reply, S}. @@ -268,7 +278,7 @@ init_trees(RPs, S) -> S2 = lists:foldl(fun(Id, SAcc) -> do_new_tree(Id, SAcc) end, S, RPs), - S2#state{built=false, closed=false}. + S2#state{built=false, closed=false, expired=false}. -spec load_built(state()) -> boolean(). load_built(#state{trees=Trees}) -> @@ -361,7 +371,7 @@ do_build_finished(S=#state{index=Index, built=_Pid}) -> hashtree:write_meta(<<"built">>, <<1>>, Tree0), hashtree:write_meta(<<"build_time">>, term_to_binary(BuildTime), Tree0), yz_kv:update_aae_tree_stats(Index, BuildTime), - S#state{built=true, build_time=BuildTime}. + S#state{built=true, build_time=BuildTime, expired=false}. -spec do_insert({p(),n()}, binary(), binary(), proplist(), state()) -> state(). do_insert(Id, Key, Hash, Opts, S=#state{trees=Trees}) -> @@ -459,26 +469,31 @@ do_compare(Id, Remote, AccFun, Acc, From, S) -> get_index_n(BKey) -> riak_kv_util:get_index_n(BKey). +-spec do_poke(state()) -> state(). do_poke(S) -> - maybe_build(maybe_clear(S)). + S1 = maybe_rebuild(maybe_expire(S)), + S2 = maybe_build(S1), + S2. -maybe_clear(S=#state{lock=undefined, built=true}) -> +-spec maybe_expire(state()) -> state(). +maybe_expire(S=#state{lock=undefined, built=true}) -> Diff = timer:now_diff(os:timestamp(), S#state.build_time), Expire = ?YZ_ENTROPY_EXPIRE, case (Expire /= never) andalso (Diff > (Expire * 1000)) of - true -> clear_tree(S); + true -> S#state{expired=true}; false -> S end; -maybe_clear(S) -> +maybe_expire(S) -> S. +-spec clear_tree(state()) -> state(). clear_tree(S=#state{index=Index}) -> lager:debug("Clearing tree ~p", [S#state.index]), S2 = destroy_trees(S), IndexN = riak_kv_util:responsible_preflists(Index), S3 = init_trees(IndexN, S2#state{trees=orddict:new()}), - S3#state{built=false}. + S3#state{built=false, expired=false}. destroy_trees(S) -> S2 = close_trees(S), @@ -515,23 +530,62 @@ close_trees(S=#state{trees=Trees, closed=false}) -> close_trees(S) -> S. -build_or_rehash(Self, S=#state{index=Index, trees=Trees}) -> +-spec build_or_rehash(pid(), state()) -> ok. +build_or_rehash(Tree, S) -> Type = case load_built(S) of false -> build; true -> rehash end, - Lock = yz_entropy_mgr:get_lock(Type), - case {Lock, Type} of - {ok, build} -> + Locked = get_all_locks(Type, self()), + build_or_rehash(Tree, Locked, Type, S). + +-spec build_or_rehash(pid(), boolean(), build | rehash, state()) -> ok. +build_or_rehash(Tree, Locked, Type, #state{index=Index, trees=Trees}) -> + case {Locked, Type} of + {true, build} -> lager:debug("Starting build: ~p", [Index]), - fold_keys(Index, Self), + fold_keys(Index, Tree), lager:debug("Finished build: ~p", [Index]), - gen_server:cast(Self, build_finished); - {ok, rehash} -> + gen_server:cast(Tree, build_finished); + {true, rehash} -> lager:debug("Starting rehash: ~p", [Index]), _ = [hashtree:rehash_tree(T) || {_,T} <- Trees], lager:debug("Finished rehash: ~p", [Index]), - gen_server:cast(Self, build_finished); - {_Error, _} -> - gen_server:cast(Self, build_failed) + gen_server:cast(Tree, build_finished); + {_, _} -> + gen_server:cast(Tree, build_failed) end. + +maybe_rebuild(S=#state{lock=undefined, built=true, expired=true}) -> + Tree = self(), + Pid = spawn_link(fun() -> + receive + {lock, Locked, S2} -> + build_or_rehash(Tree, Locked, build, S2); + stop -> + ok + end + end), + Locked = get_all_locks(build, Pid), + case Locked of + true -> + S2 = clear_tree(S), + Pid ! {lock, Locked, S2}, + S2#state{built=Pid}; + _ -> + Pid ! stop, + S + end; +maybe_rebuild(S) -> + S. + +-spec get_all_locks(build | rehash, pid()) -> boolean(). +get_all_locks(Type, Pid) -> + %% NOTE: Yokozuna diverges from KV here. KV has notion of vnode + %% fold to make sure handoff/aae don't fight each other. Yokozuna + %% has no vnodes. It would probably be a good idea to adda lock + %% around Solr so that mutliple tree builds don't fight for the + %% file page cache but the bg manager stuff is kind of convoluted + %% and there isn't time to figure this all out for 2.0. Thus, + %% Yokozuna will not bother with the Solr lock for now. + ok == yz_entropy_mgr:get_lock(Type, Pid). From 66aa4a6281055320184b2a878c47d59577b5bb77 Mon Sep 17 00:00:00 2001 From: rzezeski Date: Fri, 14 Feb 2014 03:43:12 +0000 Subject: [PATCH 4/6] Add ability to expire all trees Add the ability to easily expire all hashtrees just like they can easily be cleared. This may be useful sometime in production if you don't trust the hashtrees for some reason and is also useful for testing. --- src/yz_entropy_mgr.erl | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/yz_entropy_mgr.erl b/src/yz_entropy_mgr.erl index 984f1b59..793e909c 100644 --- a/src/yz_entropy_mgr.erl +++ b/src/yz_entropy_mgr.erl @@ -125,6 +125,16 @@ cancel_exchanges() -> clear_trees() -> gen_server:cast(?MODULE, clear_trees). +%% @doc Expire all the trees. Expired trees can still be exchanged up +%% until the point they are rebult versus clearing trees which +%% destroys them and prevents exchange from occurring until the tree +%% is rebuilt which can take a long time. +%% +%% @see clear_trees/0 +-spec expire_trees() -> ok. +expire_trees() -> + gen_server:cast(?MODULE, expire_trees). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -204,6 +214,10 @@ handle_cast(clear_trees, S) -> clear_all_trees(S#state.trees), {noreply, S}; +handle_cast(expire_trees, S) -> + ok = expire_all_trees(S#state.trees), + {noreply, S}; + handle_cast(_Msg, S) -> lager:warning("Unexpected cast: ~p", [_Msg]), {noreply, S}. @@ -270,6 +284,11 @@ clear_all_exchanges(Exchanges) -> clear_all_trees(Trees) -> [yz_index_hashtree:clear(TPid) || {_, TPid} <- Trees]. +-spec expire_all_trees(trees()) -> ok. +expire_all_trees(Trees) -> + _ = [yz_index_hashtree:expire(TPid) || {_, TPid} <- Trees], + ok. + schedule_reset_build_tokens() -> {_, Reset} = ?YZ_ENTROPY_BUILD_LIMIT, erlang:send_after(Reset, self(), reset_build_tokens). From 73e95f5275ce6757502ca34846e2ef30c21bb959 Mon Sep 17 00:00:00 2001 From: rzezeski Date: Fri, 14 Feb 2014 03:45:26 +0000 Subject: [PATCH 5/6] Add test to verify that expiration does not prevent exchange Make sure that when trees are expired it does not prevent them from exchanging. The entire purpose of expiration is to provide behavior similar to clear except that exchange can occur up until the point of tree rebuilding. --- riak_test/aae_test.erl | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/riak_test/aae_test.erl b/riak_test/aae_test.erl index bed2c516..b1b443bd 100644 --- a/riak_test/aae_test.erl +++ b/riak_test/aae_test.erl @@ -80,6 +80,8 @@ confirm() -> _ = verify_no_repair_after_restart(Cluster), + _ = verify_exchange_after_expire(Cluster), + pass. %% @doc Create a tuple containing a riak object and the first @@ -145,6 +147,29 @@ setup_index(Cluster, PBConn, YZBenchDir) -> ok = yz_rt:set_index(Node, ?BUCKET, ?INDEX), ok = yz_rt:wait_for_index(Cluster, ?INDEX). +%% @doc Verify that expired trees do not prevent exchange from +%% occurring (like clearing trees does). +-spec verify_exchange_after_expire([node()]) -> ok. +verify_exchange_after_expire(Cluster) -> + lager:info("Expire all trees and verify exchange still happens"), + lager:info("Set anti_entropy_build_limit to 0 so that trees can't be built"), + _ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {0, 1000}]) + || Node <- Cluster], + + lager:info("Expire all trees"), + _ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, []) + || Node <- Cluster], + + %% The expire is done very cast so just give it a moment + timer:sleep(100), + + ok = yz_rt:wait_for_full_exchange_round(Cluster, now()), + + lager:info("Set anti_entropy_build_limit to 100 so trees can build again"), + _ = [ok = rpc:call(Node, application, set_env, [riak_kv, anti_entropy_build_limit, {100, 1000}]) + || Node <- Cluster], + ok. + %% @doc Verify that Yokozuna deletes postings which have no %% corresponding KV object. -spec verify_removal_of_orphan_postings([node()]) -> ok. From 5c21cab9dbde3d02dc2d547cac3ec95dee1895dd Mon Sep 17 00:00:00 2001 From: rzezeski Date: Tue, 18 Feb 2014 18:13:06 +0000 Subject: [PATCH 6/6] Fix typos --- riak_test/aae_test.erl | 2 +- src/yz_entropy_mgr.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/riak_test/aae_test.erl b/riak_test/aae_test.erl index b1b443bd..f5e7ee7f 100644 --- a/riak_test/aae_test.erl +++ b/riak_test/aae_test.erl @@ -160,7 +160,7 @@ verify_exchange_after_expire(Cluster) -> _ = [ok = rpc:call(Node, yz_entropy_mgr, expire_trees, []) || Node <- Cluster], - %% The expire is done very cast so just give it a moment + %% The expire is async so just give it a moment timer:sleep(100), ok = yz_rt:wait_for_full_exchange_round(Cluster, now()), diff --git a/src/yz_entropy_mgr.erl b/src/yz_entropy_mgr.erl index 793e909c..8bb43864 100644 --- a/src/yz_entropy_mgr.erl +++ b/src/yz_entropy_mgr.erl @@ -126,7 +126,7 @@ clear_trees() -> gen_server:cast(?MODULE, clear_trees). %% @doc Expire all the trees. Expired trees can still be exchanged up -%% until the point they are rebult versus clearing trees which +%% until the point they are rebuilt versus clearing trees which %% destroys them and prevents exchange from occurring until the tree %% is rebuilt which can take a long time. %%