From ce3089f20efc66ad519f4cfc3dfb466b651a7ab6 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 12 Sep 2023 14:05:58 +0100 Subject: [PATCH 1/8] Add test for reaping tombs, and reflecting API changes associated with change --- rebar.config | 2 +- tests/nextgenrepl_deletewithfailure.erl | 4 +- tests/nextgenrepl_reaptombs.erl | 372 ++++++++++++++++++++++++ tests/verify_conditionalput_api.erl | 38 ++- tests/verify_tictacaae_nodedown.erl | 2 +- 5 files changed, 404 insertions(+), 14 deletions(-) create mode 100644 tests/nextgenrepl_reaptombs.erl diff --git a/rebar.config b/rebar.config index f1dc9b1d1..b19acbe9c 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ {getopt, ".*", {git, "https://github.com/jcomellas/getopt", {tag, "v1.0.1"}}}, {meck, {git, "https://github.com/eproxus/meck.git", {tag, "0.8.13"}}}, {mapred_verify, ".*", {git, "https://github.com/nhs-riak/mapred_verify", {branch, "nhse-develop-3.0"}}}, - {riakhttpc, ".*", {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-develop-3.0"}}}, + {riakhttpc, ".*", {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-d30-nhskv5"}}}, {kvc, "1.7.0", {git, "https://github.com/etrepum/kvc", {tag, "v1.7.0"}}}, {kv_index_tictactree, ".*", {git, "https://github.com/nhs-riak/kv_index_tictactree.git", {branch, "nhse-develop-3.0"}}} ]}. diff --git a/tests/nextgenrepl_deletewithfailure.erl b/tests/nextgenrepl_deletewithfailure.erl index 4ea74904f..400396ffc 100644 --- a/tests/nextgenrepl_deletewithfailure.erl +++ b/tests/nextgenrepl_deletewithfailure.erl @@ -217,7 +217,7 @@ test_repl(Protocol, [ClusterA, ClusterB]) -> all, count}), lager:info("Counted ~w active keys on B1 all time", [KB]), - {ok, {keys, TBL}} = aae_fold(NodeB1, + {ok, {keysclocks, TBL}} = aae_fold(NodeB1, Protocol, {find_tombs, ?TEST_BUCKET, all, @@ -468,7 +468,7 @@ fullsync_check(Protocol, {SrcNode, SrcNVal, SnkCluster}, AAEResult. length_aae_fold(Node, Protocol, Query) -> - {ok, {keys, List}} = aae_fold(Node, Protocol, Query), + {ok, {_, List}} = aae_fold(Node, Protocol, Query), length(List). aae_fold(Node, Protocol, Query) -> diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl new file mode 100644 index 000000000..27d82a4bd --- /dev/null +++ b/tests/nextgenrepl_reaptombs.erl @@ -0,0 +1,372 @@ +%% ------------------------------------------------------------------- +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% @doc +%% Coordinate the reaping of tombs between rpelicating clusters + +-module(nextgenrepl_reaptombs). +-behavior(riak_test). +-export([confirm/0]). +-export([read_from_cluster/5]). +-include_lib("eunit/include/eunit.hrl"). + +-define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>). +-define(A_RING, 8). +-define(B_RING, 16). +-define(A_NVAL, 1). +-define(B_NVAL, 3). + +-define(KEY_COUNT, 20000). + +-define(LOOP_COUNT, 10). +-define(TOMB_PAUSE, 2). + +-define(SNK_WORKERS, 4). + +-define(DELETE_WAIT, 8000). +%% This must be increased, otherwise tombstones may be reaped before their +%% presence can be checked in the test + + +-define(COMMMON_VAL_INIT, <<"CommonValueToWriteForAllObjects">>). +-define(COMMMON_VAL_MOD, <<"CommonValueToWriteForAllModifiedObjects">>). + +-define(CONFIG(RingSize, NVal, DeleteMode), [ + {riak_core, + [ + {ring_creation_size, RingSize}, + {default_bucket_props, + [ + {n_val, NVal}, + {allow_mult, true}, + {dvv_enabled, true} + ]} + ] + }, + {riak_kv, + [ + {anti_entropy, {off, []}}, + {repl_reap, true}, + {log_snk_stacktrace, true}, + {tictacaae_active, active}, + {tictacaae_parallelstore, leveled_ko}, + % if backend not leveled will use parallel key-ordered + % store + {tictacaae_storeheads, true}, + {tictacaae_rebuildwait, 4}, + {tictacaae_rebuilddelay, 3600}, + {tictacaae_exchangetick, 3600 * 1000}, % don't exchange during test + {tictacaae_rebuildtick, 3600000}, % don't tick for an hour! + {ttaaefs_maxresults, 128}, + {tombstone_pause, ?TOMB_PAUSE}, + {delete_mode, DeleteMode} + ]} + ]). + +-define(REPL_CONFIG(LocalClusterName, PeerList, SrcQueueDefns), [ + {riak_kv, + [ + {replrtq_srcqueue, SrcQueueDefns}, + {replrtq_enablesink, true}, + {replrtq_enablesrc, true}, + {replrtq_sinkqueue, LocalClusterName}, + {replrtq_sinkpeers, PeerList}, + {replrtq_sinkworkers, ?SNK_WORKERS} + ]} +]). + + +repl_config(RemoteCluster, LocalClusterName, PeerList) -> + ?REPL_CONFIG( + LocalClusterName, PeerList, atom_to_list(RemoteCluster) ++ ":any"). + + +confirm() -> + [ClusterA, ClusterB] = + rt:deploy_clusters([ + {2, ?CONFIG(?A_RING, ?A_NVAL, keep)}, + {2, ?CONFIG(?B_RING, ?B_NVAL, keep)}]), + + lager:info("Test run using PB protocol an a mix of delete modes"), + test_repl(pb, [ClusterA, ClusterB]), + + pass. + + +test_repl(Protocol, [ClusterA, ClusterB]) -> + + [NodeA1, NodeA2] = ClusterA, + [NodeB1, NodeB2] = ClusterB, + + FoldToPeerConfig = + fun(Node, Acc) -> + {Protocol, {IP, Port}} = + lists:keyfind(Protocol, 1, rt:connection_info(Node)), + Acc0 = case Acc of "" -> ""; _ -> Acc ++ "|" end, + Acc0 ++ IP ++ ":" ++ integer_to_list(Port) + ++ ":" ++ atom_to_list(Protocol) + end, + ClusterASnkPL = lists:foldl(FoldToPeerConfig, "", ClusterB), + ClusterBSnkPL = lists:foldl(FoldToPeerConfig, "", ClusterA), + + ACfg = repl_config(cluster_b, cluster_a, ClusterASnkPL), + BCfg = repl_config(cluster_a, cluster_b, ClusterBSnkPL), + rt:set_advanced_conf(NodeA1, ACfg), + rt:set_advanced_conf(NodeA2, ACfg), + rt:set_advanced_conf(NodeB1, BCfg), + rt:set_advanced_conf(NodeB2, BCfg), + + rt:join_cluster(ClusterA), + rt:join_cluster(ClusterB), + + lager:info("Waiting for convergence."), + rt:wait_until_ring_converged(ClusterA), + rt:wait_until_ring_converged(ClusterB), + lists:foreach( + fun(N) -> rt:wait_for_service(N, riak_kv) end, ClusterA ++ ClusterB), + + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + + {Protocol, {NodeA1ip, NodeA1port}} = + lists:keyfind(Protocol, 1, rt:connection_info(NodeA1)), + lager:info("Following deletes, and waiting for delay - A and B equal"), + root_compare( + Protocol, + {NodeB1, ?B_NVAL, cluster_a}, + {NodeA1ip, NodeA1port, ?A_NVAL}), + + lager:info("Confirm key count of tombs in both clusters"), + {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + ?assertEqual(?KEY_COUNT, TCA1), + ?assertEqual(?KEY_COUNT, TCB1), + + reap_from_cluster(NodeA1, local), + lager:info("Confirm all keys reaped from both clusters"), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end), + + pass. + +fullsync_check(Protocol, {SrcNode, SrcNVal, SnkCluster}, + {SinkIP, SinkPort, SinkNVal}) -> + ModRef = riak_kv_ttaaefs_manager, + _ = rpc:call(SrcNode, ModRef, pause, []), + ok = rpc:call(SrcNode, ModRef, set_queuename, [SnkCluster]), + ok = rpc:call(SrcNode, ModRef, set_sink, [Protocol, SinkIP, SinkPort]), + ok = rpc:call(SrcNode, ModRef, set_allsync, [SrcNVal, SinkNVal]), + AAEResult = rpc:call(SrcNode, riak_client, ttaaefs_fullsync, [all_check, 60]), + AAEResult. + + +%% @doc Write a series of keys and ensure they are all written. +write_to_cluster(Node, Start, End, CommonValBin) -> + lager:info("Writing ~p keys to node ~p.", [End - Start + 1, Node]), + lager:warning("Note that only utf-8 keys are used"), + {ok, C} = riak:client_connect(Node), + F = + fun(N, Acc) -> + Key = list_to_binary(io_lib:format("~8..0B", [N])), + Obj = + case CommonValBin of + new_obj -> + CVB = ?COMMMON_VAL_INIT, + riak_object:new(?TEST_BUCKET, + Key, + <>); + UpdateBin -> + UPDV = <>, + {ok, PrevObj} = riak_client:get(?TEST_BUCKET, Key, C), + riak_object:update_value(PrevObj, UPDV) + end, + try riak_client:put(Obj, C) of + ok -> + Acc; + Other -> + [{N, Other} | Acc] + catch + What:Why -> + [{N, {What, Why}} | Acc] + end + end, + Errors = lists:foldl(F, [], lists:seq(Start, End)), + lager:warning("~p errors while writing: ~p", [length(Errors), Errors]), + ?assertEqual([], Errors). + +delete_from_cluster(Node, Start, End) -> + lager:info("Deleting ~p keys from node ~p.", [End - Start + 1, Node]), + lager:warning("Note that only utf-8 keys are used"), + {ok, C} = riak:client_connect(Node), + F = + fun(N, Acc) -> + Key = list_to_binary(io_lib:format("~8..0B", [N])), + try riak_client:delete(?TEST_BUCKET, Key, C) of + ok -> + Acc; + Other -> + [{N, Other} | Acc] + catch + What:Why -> + [{N, {What, Why}} | Acc] + end + end, + Errors = lists:foldl(F, [], lists:seq(Start, End)), + lager:warning("~p errors while deleting: ~p", [length(Errors), Errors]), + ?assertEqual([], Errors). + + +reap_from_cluster(Node, local) -> + lager:info("Auto-reaping found tombs from node ~p", [Node]), + {ok, C} = riak:client_connect(Node), + Query = {reap_tombs, ?TEST_BUCKET, all, all, all, local}, + {ok, Count} = riak_client:aae_fold(Query, C), + ?assertEqual(?KEY_COUNT, Count). + + +read_from_cluster(Node, Start, End, CommonValBin, Errors) -> + lager:info("Reading ~p keys from node ~p.", [End - Start + 1, Node]), + {ok, C} = riak:client_connect(Node), + F = + fun(N, Acc) -> + Key = list_to_binary(io_lib:format("~8..0B", [N])), + case riak_client:get(?TEST_BUCKET, Key, C) of + {ok, Obj} -> + ExpectedVal = <>, + case riak_object:get_values(Obj) of + [ExpectedVal] -> + Acc; + Siblings when length(Siblings) > 1 -> + lager:info( + "Siblings for Key ~s:~n ~w", [Key, Obj]), + [{wrong_value, Key, siblings}|Acc]; + [UnexpectedVal] -> + [{wrong_value, Key, UnexpectedVal}|Acc] + end; + {error, Error} -> + [{fetch_error, Error, Key}|Acc] + end + end, + ErrorsFound = lists:foldl(F, [], lists:seq(Start, End)), + case Errors of + undefined -> + lager:info("Errors Found in read_from_cluster ~w", + [length(ErrorsFound)]), + length(ErrorsFound); + _ -> + ?assertEqual(Errors, length(ErrorsFound)) + end. + + +find_tombs(Node, KR, MR, ResultType) -> + lager:info("Finding tombstones from node ~p.", [Node]), + {ok, C} = riak:client_connect(Node), + case ResultType of + return_keys -> + riak_client:aae_fold({find_tombs, ?TEST_BUCKET, KR, all, MR}, C); + return_count -> + riak_client:aae_fold({reap_tombs, ?TEST_BUCKET, KR, all, MR, count}, C) + end. + + +wait_for_outcome(Module, Func, Args, ExpOutcome, Loops) -> + wait_for_outcome(Module, Func, Args, ExpOutcome, 0, Loops). + +wait_for_outcome(Module, Func, Args, _ExpOutcome, LoopCount, LoopCount) -> + apply(Module, Func, Args); +wait_for_outcome(Module, Func, Args, ExpOutcome, LoopCount, MaxLoops) -> + case apply(Module, Func, Args) of + ExpOutcome -> + ExpOutcome; + NotRightYet -> + lager:info("~w not yet ~w ~w", [Func, ExpOutcome, NotRightYet]), + timer:sleep(LoopCount * 2000), + wait_for_outcome(Module, Func, Args, ExpOutcome, + LoopCount + 1, MaxLoops) + end. + +write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2) -> + lager:info("Write ~w objects into A and read from B", [?KEY_COUNT]), + write_to_cluster(NodeA1, 1, ?KEY_COUNT, new_obj), + lager:info("Waiting to read sample"), + 0 = + wait_for_outcome(?MODULE, + read_from_cluster, + [NodeB1, ?KEY_COUNT - 31, ?KEY_COUNT, + ?COMMMON_VAL_INIT, undefined], + 0, + ?LOOP_COUNT), + lager:info("Waiting to read all"), + 0 = + wait_for_outcome(?MODULE, + read_from_cluster, + [NodeB1, 1, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], + 0, + ?LOOP_COUNT), + + lager:info("Deleting ~w objects from B and read not_found from A", [?KEY_COUNT]), + delete_from_cluster(NodeB2, 1, ?KEY_COUNT), + lager:info("Waiting for missing sample"), + 32 = + wait_for_outcome(?MODULE, + read_from_cluster, + [NodeA2, ?KEY_COUNT - 31, ?KEY_COUNT, + ?COMMMON_VAL_INIT, undefined], + 32, + ?LOOP_COUNT), + lager:info("Waiting for all missing"), + ?KEY_COUNT = + wait_for_outcome(?MODULE, + read_from_cluster, + [NodeA2, 1, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], + ?KEY_COUNT, + ?LOOP_COUNT), + lager:info("Write and delete cycle confirmed"). + + +root_compare( + Protocol, + {NodeX, XNVAL, QueueName}, + {NodeY, YPort, YNVAL}) -> + timer:sleep(?DELETE_WAIT), + R = + fullsync_check( + Protocol, + {NodeX, XNVAL, QueueName}, + {NodeY, YPort, YNVAL}), + {root_compare, 0} = + case R of + {Outcome, N} when N < 10, Outcome =/= root_compare -> + %% There is a problem here with immediate mode delete + %% in that it can sometimes fail to clean up the odd + %% tombstone. + %% It was for this reason the tombstone_delay was added + %% but amending this cannot stop an intermittent issue + %% Workaround for the purpose of this test is to permit + %% a small discrepancy in this case + lager:warning( + "Immediate delete issue - ~w not cleared ~w", + [N, Outcome]), + timer:sleep(2 * ?DELETE_WAIT), + root_compare( + Protocol, + {NodeX, XNVAL, QueueName}, + {NodeY, YPort, YNVAL}); + R -> + R + end. \ No newline at end of file diff --git a/tests/verify_conditionalput_api.erl b/tests/verify_conditionalput_api.erl index 27a954025..c6c69e7c9 100644 --- a/tests/verify_conditionalput_api.erl +++ b/tests/verify_conditionalput_api.erl @@ -84,15 +84,33 @@ confirm() -> RHCc = rt:httpc(CurrentNode), ok = test_api_consistency(RHCc, rhc, <<"bucketHTTP">>, current), - rt:wait_for_service(PreviousNode, riak_kv), - - RPCp = rt:pbc(PreviousNode), - ok = test_api_consistency(RPCp, riakc_pb_socket, <<"bucketPB">>, previous), - - RHCp = rt:httpc(PreviousNode), - ok = test_api_consistency(RHCp, rhc, <<"bucketHTTP">>, previous), - - pass. + TestMetaData = riak_test_runner:metadata(), + {match, [Vsn]} = + re:run( + proplists:get_value(version, TestMetaData), + "riak-(?[0-9\.]+)", + [{capture, ['VER'], binary}]), + case Vsn > <<"3.0.16">> of + true -> + lager:info("Not testing previous"), + lager:info("Current tested version is ~s", [Vsn]), + lager:info( + "Issues with change of client to support" + " reap_tomb API change in 3.0.17"), + pass; + false -> + rt:wait_for_service(PreviousNode, riak_kv), + + RPCp = rt:pbc(PreviousNode), + ok = test_api_consistency( + RPCp, riakc_pb_socket, <<"bucketPB">>, previous), + + RHCp = rt:httpc(PreviousNode), + ok = test_api_consistency( + RHCp, rhc, <<"bucketHTTP">>, previous), + + pass + end. test_api_consistency(Client, ClientMod, Bucket, Version) -> @@ -324,6 +342,6 @@ check_current_match_conflict(rhc, MatchError) -> ?assertMatch("409", StatusCode). log_tombs(ClientMod, Client, Bucket) -> - {ok, {keys, L}} = ClientMod:aae_find_tombs(Client, Bucket, all, all, all), + {ok, {keysclocks, L}} = ClientMod:aae_find_tombs(Client, Bucket, all, all, all), lager:info("Found ~w tombs", [length(L)]), L. diff --git a/tests/verify_tictacaae_nodedown.erl b/tests/verify_tictacaae_nodedown.erl index 4f7e60be1..c687ef2ec 100644 --- a/tests/verify_tictacaae_nodedown.erl +++ b/tests/verify_tictacaae_nodedown.erl @@ -54,7 +54,7 @@ -define(NUM_KEYS, 5000). -define(BUCKET, <<"test_bucket">>). -define(N_VAL, 3). --define(RETRY_LOOPS, 12). +-define(RETRY_LOOPS, 15). -define(RETRY_PAUSE, 2000). confirm() -> From faa4c27a529dc35d3f72536fe21d9ea399795e1d Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 13 Sep 2023 13:24:48 +0100 Subject: [PATCH 2/8] Extend test to cover protocols and failure And add to group --- groups/nextgenrepl | 1 + tests/nextgenrepl_reaptombs.erl | 136 +++++++++++++++++++++++++++----- 2 files changed, 118 insertions(+), 19 deletions(-) diff --git a/groups/nextgenrepl b/groups/nextgenrepl index a4146a387..8c826023d 100644 --- a/groups/nextgenrepl +++ b/groups/nextgenrepl @@ -3,6 +3,7 @@ nextgenrepl_bouncingtomb nextgenrepl_deletemodes nextgenrepl_deletewithfailure nextgenrepl_external_reconcile +nextgenrepl_reaptombs nextgenrepl_rtq_auto nextgenrepl_rtq_autocrdt nextgenrepl_rtq_autotypes diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl index 27d82a4bd..9c4120388 100644 --- a/tests/nextgenrepl_reaptombs.erl +++ b/tests/nextgenrepl_reaptombs.erl @@ -27,7 +27,7 @@ -define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>). -define(A_RING, 8). -define(B_RING, 16). --define(A_NVAL, 1). +-define(A_NVAL, 2). -define(B_NVAL, 3). -define(KEY_COUNT, 20000). @@ -62,16 +62,7 @@ {anti_entropy, {off, []}}, {repl_reap, true}, {log_snk_stacktrace, true}, - {tictacaae_active, active}, - {tictacaae_parallelstore, leveled_ko}, - % if backend not leveled will use parallel key-ordered - % store - {tictacaae_storeheads, true}, - {tictacaae_rebuildwait, 4}, - {tictacaae_rebuilddelay, 3600}, - {tictacaae_exchangetick, 3600 * 1000}, % don't exchange during test - {tictacaae_rebuildtick, 3600000}, % don't tick for an hour! - {ttaaefs_maxresults, 128}, + {tictacaae_active, passive}, {tombstone_pause, ?TOMB_PAUSE}, {delete_mode, DeleteMode} ]} @@ -98,19 +89,39 @@ repl_config(RemoteCluster, LocalClusterName, PeerList) -> confirm() -> [ClusterA, ClusterB] = rt:deploy_clusters([ - {2, ?CONFIG(?A_RING, ?A_NVAL, keep)}, - {2, ?CONFIG(?B_RING, ?B_NVAL, keep)}]), + {3, ?CONFIG(?A_RING, ?A_NVAL, keep)}, + {3, ?CONFIG(?B_RING, ?B_NVAL, keep)}]), - lager:info("Test run using PB protocol an a mix of delete modes"), - test_repl(pb, [ClusterA, ClusterB]), + lager:info("***************************************************"), + lager:info("Test run using PB protocol on a healthy cluster"), + lager:info("***************************************************"), + pass = test_repl_reap(pb, [ClusterA, ClusterB]), + + lager:info("***************************************************"), + lager:info("Test run using PB protocol with node failure"), + lager:info("***************************************************"), + pass = test_repl_reap_with_node_down(ClusterA, ClusterB), + + rt:clean_cluster(ClusterA), + rt:clean_cluster(ClusterB), + + [ClusterA, ClusterB] = + rt:deploy_clusters([ + {3, ?CONFIG(?A_RING, ?A_NVAL, keep)}, + {3, ?CONFIG(?B_RING, ?B_NVAL, keep)}]), + + lager:info("***************************************************"), + lager:info("Test run using HTTP protocol on a healthy cluster"), + lager:info("***************************************************"), + pass = test_repl_reap(http, [ClusterA, ClusterB]), pass. -test_repl(Protocol, [ClusterA, ClusterB]) -> +test_repl_reap(Protocol, [ClusterA, ClusterB]) -> - [NodeA1, NodeA2] = ClusterA, - [NodeB1, NodeB2] = ClusterB, + [NodeA1, NodeA2, NodeA3] = ClusterA, + [NodeB1, NodeB2, NodeB3] = ClusterB, FoldToPeerConfig = fun(Node, Acc) -> @@ -127,8 +138,10 @@ test_repl(Protocol, [ClusterA, ClusterB]) -> BCfg = repl_config(cluster_a, cluster_b, ClusterBSnkPL), rt:set_advanced_conf(NodeA1, ACfg), rt:set_advanced_conf(NodeA2, ACfg), + rt:set_advanced_conf(NodeA3, ACfg), rt:set_advanced_conf(NodeB1, BCfg), rt:set_advanced_conf(NodeB2, BCfg), + rt:set_advanced_conf(NodeB3, BCfg), rt:join_cluster(ClusterA), rt:join_cluster(ClusterB), @@ -159,8 +172,93 @@ test_repl(Protocol, [ClusterA, ClusterB]) -> lager:info("Confirm all keys reaped from both clusters"), rt:wait_until( fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + lager:info("All reaped from Cluster A"), + lager:info("Now would expect ClusterB to quickly be in sync"), + lager:info("So waiting only 5 seconds"), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end, + 5, + 1000 + ), + + lager:info( + "Confirm reaps are not looping around - all reaper queues empty"), + ReapQueueFun = + fun(N) -> + {mqueue_lengths, MQLs} = + lists:keyfind( + mqueue_lengths, + 1, + rpc:call(N, riak_kv_reaper, reap_stats, [])), + lager:info("Reap queue lengths ~w on ~w", [MQLs, N]), + QS = lists:sum(lists:map(fun({_P, L}) -> L end, MQLs)), + ?assert(QS == 0) + end, + lists:foreach(ReapQueueFun, ClusterA ++ ClusterB), + pass. + +test_repl_reap_with_node_down(ClusterA, ClusterB) -> + + [NodeA1, NodeA2, _NodeA3] = ClusterA, + [NodeB1, NodeB2, _NodeB3] = ClusterB, + + lager:info("Test again - but with failure in A"), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + lager:info("Confirm key count of tombs in both clusters"), + {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + ?assertEqual(?KEY_COUNT, TCA1), + ?assertEqual(?KEY_COUNT, TCB1), + + lager:info("Stopping node 2 in A"), + rt:stop_and_wait(NodeA2), + + lager:info("Fold to trigger reap of all tombs - whilst node down"), + reap_from_cluster(NodeA1, local), + + rt:start_and_wait(NodeA2), + lists:foreach(fun rt:wait_until_node_handoffs_complete/1, ClusterA), + lager:info("Node 2 restarted"), + + + lager:info("Confirm all keys reaped from both clusters"), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + lager:info("All reaped from Cluster A"), + lager:info("Now would expect ClusterB to quickly be in sync"), + lager:info("So waiting only 5 seconds"), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end, + 5, + 1000 + ), + + lager:info("Test again - but with failure in B"), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + lager:info("Confirm key count of tombs in both clusters"), + {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + ?assertEqual(?KEY_COUNT, TCA1), + ?assertEqual(?KEY_COUNT, TCB1), + + lager:info("Stopping node 2 in B"), + rt:stop_and_wait(NodeB2), + + lager:info("Fold to trigger reap of all tombs - whilst node down"), + reap_from_cluster(NodeA1, local), + + rt:start_and_wait(NodeB2), + lists:foreach(fun rt:wait_until_node_handoffs_complete/1, ClusterB), + lager:info("Node 2 restarted"), + + lager:info("Confirm all keys reaped from both clusters"), + rt:wait_until( + fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + lager:info("All reaped from Cluster A"), + lager:info("Now would expect ClusterB to be eventually in sync"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end), + fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end + ), pass. From 7fb6f199ac2c15af0153edbd654cb04ba6837016 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 13 Sep 2023 13:34:40 +0100 Subject: [PATCH 3/8] Must have tictacaae enabled But exchange tick set so that it will never run --- tests/nextgenrepl_reaptombs.erl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl index 9c4120388..1cd42d248 100644 --- a/tests/nextgenrepl_reaptombs.erl +++ b/tests/nextgenrepl_reaptombs.erl @@ -60,6 +60,16 @@ {riak_kv, [ {anti_entropy, {off, []}}, + {tictacaae_active, active}, + {tictacaae_parallelstore, leveled_ko}, + % if backend not leveled will use parallel key-ordered + % store + {tictacaae_storeheads, true}, + {tictacaae_rebuildwait, 4}, + {tictacaae_rebuilddelay, 3600}, + {tictacaae_exchangetick, 3600 * 1000}, % don't exchange during test + {tictacaae_rebuildtick, 3600000}, % don't tick for an hour! + {ttaaefs_maxresults, 128}, {repl_reap, true}, {log_snk_stacktrace, true}, {tictacaae_active, passive}, From 2a3993049815e29d47fbd75bc0834f7e56ebd299 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 13 Sep 2023 13:47:19 +0100 Subject: [PATCH 4/8] Update nextgenrepl_reaptombs.erl Remove rogue config item --- tests/nextgenrepl_reaptombs.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl index 1cd42d248..cd5f5d62a 100644 --- a/tests/nextgenrepl_reaptombs.erl +++ b/tests/nextgenrepl_reaptombs.erl @@ -72,7 +72,6 @@ {ttaaefs_maxresults, 128}, {repl_reap, true}, {log_snk_stacktrace, true}, - {tictacaae_active, passive}, {tombstone_pause, ?TOMB_PAUSE}, {delete_mode, DeleteMode} ]} From 29ab832a3d1139a6969b4a71e91492e663dea3ec Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 13 Sep 2023 15:18:37 +0100 Subject: [PATCH 5/8] Update to test with bucket types --- tests/nextgenrepl_reaptombs.erl | 138 ++++++++++++++++++++++---------- 1 file changed, 97 insertions(+), 41 deletions(-) diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl index cd5f5d62a..1e302844c 100644 --- a/tests/nextgenrepl_reaptombs.erl +++ b/tests/nextgenrepl_reaptombs.erl @@ -21,7 +21,7 @@ -module(nextgenrepl_reaptombs). -behavior(riak_test). -export([confirm/0]). --export([read_from_cluster/5]). +-export([read_from_cluster/6]). -include_lib("eunit/include/eunit.hrl"). -define(TEST_BUCKET, <<"repl-aae-fullsync-systest_a">>). @@ -161,7 +161,7 @@ test_repl_reap(Protocol, [ClusterA, ClusterB]) -> lists:foreach( fun(N) -> rt:wait_for_service(N, riak_kv) end, ClusterA ++ ClusterB), - write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2, ?TEST_BUCKET), {Protocol, {NodeA1ip, NodeA1port}} = lists:keyfind(Protocol, 1, rt:connection_info(NodeA1)), @@ -172,20 +172,67 @@ test_repl_reap(Protocol, [ClusterA, ClusterB]) -> {NodeA1ip, NodeA1port, ?A_NVAL}), lager:info("Confirm key count of tombs in both clusters"), - {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), - {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + {ok, TCA1} = find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count), ?assertEqual(?KEY_COUNT, TCA1), ?assertEqual(?KEY_COUNT, TCB1), - reap_from_cluster(NodeA1, local), + reap_from_cluster(NodeA1, local, ?TEST_BUCKET), lager:info("Confirm all keys reaped from both clusters"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + fun() -> + {ok, 0} == find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count) + end), lager:info("All reaped from Cluster A"), lager:info("Now would expect ClusterB to quickly be in sync"), lager:info("So waiting only 5 seconds"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end, + fun() -> + {ok, 0} == find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count) + end, + 5, + 1000 + ), + + lager:info("Setup bucket type for test on both Clusters"), + Type = <<"mytype">>, + TypeProps = [{n_val, 1}], + lager:info("Create bucket type ~p, wait for propagation", [Type]), + rt:create_and_activate_bucket_type(NodeA1, Type, TypeProps), + rt:wait_until_bucket_type_status(Type, active, ClusterA), + rt:wait_until_bucket_props(ClusterA, {Type, <<"bucket">>}, TypeProps), + rt:create_and_activate_bucket_type(NodeB1, Type, TypeProps), + rt:wait_until_bucket_type_status(Type, active, ClusterB), + rt:wait_until_bucket_props(ClusterB, {Type, <<"bucket">>}, TypeProps), + + lager:info("Load keys into typed bucket"), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2, {Type, ?TEST_BUCKET}), + + lager:info("Confirm key count of tombs in both clusters"), + {ok, TCA2} = + find_tombs(NodeA1, {Type, ?TEST_BUCKET}, all, all, return_count), + {ok, TCB2} = + find_tombs(NodeB1, {Type, ?TEST_BUCKET}, all, all, return_count), + ?assertEqual(?KEY_COUNT, TCA2), + ?assertEqual(?KEY_COUNT, TCB2), + + reap_from_cluster(NodeA1, local, {Type, ?TEST_BUCKET}), + lager:info("Confirm all keys reaped from both clusters"), + rt:wait_until( + fun() -> + {ok, 0} == + find_tombs( + NodeA1, {Type, ?TEST_BUCKET}, all, all, return_count) + end), + lager:info("All reaped from Cluster A"), + lager:info("Now would expect ClusterB to quickly be in sync"), + lager:info("So waiting only 5 seconds"), + rt:wait_until( + fun() -> + {ok, 0} == + find_tombs( + NodeB1, {Type, ?TEST_BUCKET}, all, all, return_count) + end, 5, 1000 ), @@ -204,6 +251,7 @@ test_repl_reap(Protocol, [ClusterA, ClusterB]) -> ?assert(QS == 0) end, lists:foreach(ReapQueueFun, ClusterA ++ ClusterB), + pass. test_repl_reap_with_node_down(ClusterA, ClusterB) -> @@ -212,10 +260,10 @@ test_repl_reap_with_node_down(ClusterA, ClusterB) -> [NodeB1, NodeB2, _NodeB3] = ClusterB, lager:info("Test again - but with failure in A"), - write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2, ?TEST_BUCKET), lager:info("Confirm key count of tombs in both clusters"), - {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), - {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + {ok, TCA1} = find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count), ?assertEqual(?KEY_COUNT, TCA1), ?assertEqual(?KEY_COUNT, TCB1), @@ -223,7 +271,7 @@ test_repl_reap_with_node_down(ClusterA, ClusterB) -> rt:stop_and_wait(NodeA2), lager:info("Fold to trigger reap of all tombs - whilst node down"), - reap_from_cluster(NodeA1, local), + reap_from_cluster(NodeA1, local, ?TEST_BUCKET), rt:start_and_wait(NodeA2), lists:foreach(fun rt:wait_until_node_handoffs_complete/1, ClusterA), @@ -232,21 +280,25 @@ test_repl_reap_with_node_down(ClusterA, ClusterB) -> lager:info("Confirm all keys reaped from both clusters"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + fun() -> + {ok, 0} == find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count) + end), lager:info("All reaped from Cluster A"), lager:info("Now would expect ClusterB to quickly be in sync"), lager:info("So waiting only 5 seconds"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end, + fun() -> + {ok, 0} == find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count) + end, 5, 1000 ), lager:info("Test again - but with failure in B"), - write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2), + write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2, ?TEST_BUCKET), lager:info("Confirm key count of tombs in both clusters"), - {ok, TCA1} = find_tombs(NodeA1, all, all, return_count), - {ok, TCB1} = find_tombs(NodeB1, all, all, return_count), + {ok, TCA1} = find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count), + {ok, TCB1} = find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count), ?assertEqual(?KEY_COUNT, TCA1), ?assertEqual(?KEY_COUNT, TCB1), @@ -254,7 +306,7 @@ test_repl_reap_with_node_down(ClusterA, ClusterB) -> rt:stop_and_wait(NodeB2), lager:info("Fold to trigger reap of all tombs - whilst node down"), - reap_from_cluster(NodeA1, local), + reap_from_cluster(NodeA1, local, ?TEST_BUCKET), rt:start_and_wait(NodeB2), lists:foreach(fun rt:wait_until_node_handoffs_complete/1, ClusterB), @@ -262,12 +314,15 @@ test_repl_reap_with_node_down(ClusterA, ClusterB) -> lager:info("Confirm all keys reaped from both clusters"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeA1, all, all, return_count) end), + fun() -> + {ok, 0} == find_tombs(NodeA1, ?TEST_BUCKET, all, all, return_count) + end), lager:info("All reaped from Cluster A"), lager:info("Now would expect ClusterB to be eventually in sync"), rt:wait_until( - fun() -> {ok, 0} == find_tombs(NodeB1, all, all, return_count) end - ), + fun() -> + {ok, 0} == find_tombs(NodeB1, ?TEST_BUCKET, all, all, return_count) + end), pass. @@ -283,7 +338,7 @@ fullsync_check(Protocol, {SrcNode, SrcNVal, SnkCluster}, %% @doc Write a series of keys and ensure they are all written. -write_to_cluster(Node, Start, End, CommonValBin) -> +write_to_cluster(Node, Bucket, Start, End, CommonValBin) -> lager:info("Writing ~p keys to node ~p.", [End - Start + 1, Node]), lager:warning("Note that only utf-8 keys are used"), {ok, C} = riak:client_connect(Node), @@ -294,12 +349,11 @@ write_to_cluster(Node, Start, End, CommonValBin) -> case CommonValBin of new_obj -> CVB = ?COMMMON_VAL_INIT, - riak_object:new(?TEST_BUCKET, - Key, - <>); + riak_object:new( + Bucket, Key, <>); UpdateBin -> UPDV = <>, - {ok, PrevObj} = riak_client:get(?TEST_BUCKET, Key, C), + {ok, PrevObj} = riak_client:get(Bucket, Key, C), riak_object:update_value(PrevObj, UPDV) end, try riak_client:put(Obj, C) of @@ -316,14 +370,14 @@ write_to_cluster(Node, Start, End, CommonValBin) -> lager:warning("~p errors while writing: ~p", [length(Errors), Errors]), ?assertEqual([], Errors). -delete_from_cluster(Node, Start, End) -> +delete_from_cluster(Node, Bucket, Start, End) -> lager:info("Deleting ~p keys from node ~p.", [End - Start + 1, Node]), lager:warning("Note that only utf-8 keys are used"), {ok, C} = riak:client_connect(Node), F = fun(N, Acc) -> Key = list_to_binary(io_lib:format("~8..0B", [N])), - try riak_client:delete(?TEST_BUCKET, Key, C) of + try riak_client:delete(Bucket, Key, C) of ok -> Acc; Other -> @@ -338,21 +392,21 @@ delete_from_cluster(Node, Start, End) -> ?assertEqual([], Errors). -reap_from_cluster(Node, local) -> +reap_from_cluster(Node, local, Bucket) -> lager:info("Auto-reaping found tombs from node ~p", [Node]), {ok, C} = riak:client_connect(Node), - Query = {reap_tombs, ?TEST_BUCKET, all, all, all, local}, + Query = {reap_tombs, Bucket, all, all, all, local}, {ok, Count} = riak_client:aae_fold(Query, C), ?assertEqual(?KEY_COUNT, Count). -read_from_cluster(Node, Start, End, CommonValBin, Errors) -> +read_from_cluster(Node, Bucket, Start, End, CommonValBin, Errors) -> lager:info("Reading ~p keys from node ~p.", [End - Start + 1, Node]), {ok, C} = riak:client_connect(Node), F = fun(N, Acc) -> Key = list_to_binary(io_lib:format("~8..0B", [N])), - case riak_client:get(?TEST_BUCKET, Key, C) of + case riak_client:get(Bucket, Key, C) of {ok, Obj} -> ExpectedVal = <>, case riak_object:get_values(Obj) of @@ -380,14 +434,14 @@ read_from_cluster(Node, Start, End, CommonValBin, Errors) -> end. -find_tombs(Node, KR, MR, ResultType) -> +find_tombs(Node, Bucket, KR, MR, ResultType) -> lager:info("Finding tombstones from node ~p.", [Node]), {ok, C} = riak:client_connect(Node), case ResultType of return_keys -> - riak_client:aae_fold({find_tombs, ?TEST_BUCKET, KR, all, MR}, C); + riak_client:aae_fold({find_tombs, Bucket, KR, all, MR}, C); return_count -> - riak_client:aae_fold({reap_tombs, ?TEST_BUCKET, KR, all, MR, count}, C) + riak_client:aae_fold({reap_tombs, Bucket, KR, all, MR, count}, C) end. @@ -407,14 +461,14 @@ wait_for_outcome(Module, Func, Args, ExpOutcome, LoopCount, MaxLoops) -> LoopCount + 1, MaxLoops) end. -write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2) -> +write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2, Bucket) -> lager:info("Write ~w objects into A and read from B", [?KEY_COUNT]), - write_to_cluster(NodeA1, 1, ?KEY_COUNT, new_obj), + write_to_cluster(NodeA1, Bucket, 1, ?KEY_COUNT, new_obj), lager:info("Waiting to read sample"), 0 = wait_for_outcome(?MODULE, read_from_cluster, - [NodeB1, ?KEY_COUNT - 31, ?KEY_COUNT, + [NodeB1, Bucket, ?KEY_COUNT - 31, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], 0, ?LOOP_COUNT), @@ -422,17 +476,18 @@ write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2) -> 0 = wait_for_outcome(?MODULE, read_from_cluster, - [NodeB1, 1, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], + [NodeB1, Bucket, 1, ?KEY_COUNT, + ?COMMMON_VAL_INIT, undefined], 0, ?LOOP_COUNT), lager:info("Deleting ~w objects from B and read not_found from A", [?KEY_COUNT]), - delete_from_cluster(NodeB2, 1, ?KEY_COUNT), + delete_from_cluster(NodeB2, Bucket, 1, ?KEY_COUNT), lager:info("Waiting for missing sample"), 32 = wait_for_outcome(?MODULE, read_from_cluster, - [NodeA2, ?KEY_COUNT - 31, ?KEY_COUNT, + [NodeA2, Bucket, ?KEY_COUNT - 31, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], 32, ?LOOP_COUNT), @@ -440,7 +495,8 @@ write_then_delete(NodeA1, NodeA2, NodeB1, NodeB2) -> ?KEY_COUNT = wait_for_outcome(?MODULE, read_from_cluster, - [NodeA2, 1, ?KEY_COUNT, ?COMMMON_VAL_INIT, undefined], + [NodeA2, Bucket, 1, ?KEY_COUNT, + ?COMMMON_VAL_INIT, undefined], ?KEY_COUNT, ?LOOP_COUNT), lager:info("Write and delete cycle confirmed"). From ff24933098ff06782e813890b5231cbd9cd69382 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 13 Sep 2023 16:23:43 +0100 Subject: [PATCH 6/8] Remove rogue whitespace ProtocolVersionError was not matching due to rogue white space --- src/make_certs.erl | 5 ++++- tests/pb_cipher_suites.erl | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/make_certs.erl b/src/make_certs.erl index 5ee1074b5..0a89fb93b 100644 --- a/src/make_certs.erl +++ b/src/make_certs.erl @@ -20,7 +20,7 @@ -module(make_certs). -compile([export_all, nowarn_export_all]). --export([all/1, all/2, rootCA/2, intermediateCA/3, endusers/3, enduser/3, revoke/3, gencrl/2, verify/3]). +-export([all/1, all/2, rootCA/2, intermediateCA/3, endusers/3, enduser/3, revoke/3, gencrl/2, verify/3, version/0]). -record(dn, {commonName, organizationalUnitName = "Basho Engineering", @@ -32,6 +32,9 @@ -define(OpenSSLCmd, "openssl"). +version() -> + os:cmd(?OpenSSLCmd ++ " version"). + all([DataDir, PrivDir]) -> all(DataDir, PrivDir). diff --git a/tests/pb_cipher_suites.erl b/tests/pb_cipher_suites.erl index ac87b509b..e2ac01469 100644 --- a/tests/pb_cipher_suites.erl +++ b/tests/pb_cipher_suites.erl @@ -18,6 +18,8 @@ confirm() -> CertDir = rt_config:get(rt_scratch_dir) ++ "/pb_cipher_suites_certs", %% make a bunch of crypto keys + lager:info("running make_certs with version ~p", [make_certs:version()]), + make_certs:rootCA(CertDir, "rootCA"), make_certs:intermediateCA(CertDir, "intCA", "rootCA"), make_certs:intermediateCA(CertDir, "revokedCA", "rootCA"), @@ -246,7 +248,7 @@ insufficient_check(Port, SingleCipherProps) -> check_reasons({protocol_version, "TLS client: In state hello received SERVER ALERT:" - " Fatal - Protocol Version\n "}) -> + " Fatal - Protocol Version\n"}) -> ok; check_reasons(ProtocolVersionError) -> lager:info("Unexpected error ~s", [ProtocolVersionError]), @@ -303,7 +305,7 @@ check_with_reenabled_protools(Port, CertDir) -> "received CLIENT ALERT: Fatal - Protocol Version"}) -> ok; check_reasons(ProtocolVersionError) -> - lager:info("Unexpected error ~s", [ProtocolVersionError]), + lager:info("Unexpected error ~p", [ProtocolVersionError]), error. insufficient_check(Port, SingleCipherProps) -> @@ -322,7 +324,7 @@ check_with_reenabled_protools(Port, CertDir) -> check_reasons("protocol version") -> ok; check_reasons(ProtocolVersionError) -> - lager:info("Unexpected error ~s", [ProtocolVersionError]), + lager:info("Unexpected error ~p", [ProtocolVersionError]), error. check_with_reenabled_protools(Port, CertDir) -> From c424305a2d83fc458583d76c1ad578a40b5f4c30 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 14 Sep 2023 12:19:26 +0100 Subject: [PATCH 7/8] Update nextgenrepl_reaptombs.erl --- tests/nextgenrepl_reaptombs.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/nextgenrepl_reaptombs.erl b/tests/nextgenrepl_reaptombs.erl index 1e302844c..9e984e774 100644 --- a/tests/nextgenrepl_reaptombs.erl +++ b/tests/nextgenrepl_reaptombs.erl @@ -16,7 +16,7 @@ %% %% ------------------------------------------------------------------- %% @doc -%% Coordinate the reaping of tombs between rpelicating clusters +%% Coordinate the reaping of tombs between replicating clusters -module(nextgenrepl_reaptombs). -behavior(riak_test). @@ -254,6 +254,7 @@ test_repl_reap(Protocol, [ClusterA, ClusterB]) -> pass. + test_repl_reap_with_node_down(ClusterA, ClusterB) -> [NodeA1, NodeA2, _NodeA3] = ClusterA, @@ -337,6 +338,9 @@ fullsync_check(Protocol, {SrcNode, SrcNVal, SnkCluster}, AAEResult. +to_key(N) -> + list_to_binary(io_lib:format("~8..0B", [N])). + %% @doc Write a series of keys and ensure they are all written. write_to_cluster(Node, Bucket, Start, End, CommonValBin) -> lager:info("Writing ~p keys to node ~p.", [End - Start + 1, Node]), @@ -344,16 +348,15 @@ write_to_cluster(Node, Bucket, Start, End, CommonValBin) -> {ok, C} = riak:client_connect(Node), F = fun(N, Acc) -> - Key = list_to_binary(io_lib:format("~8..0B", [N])), Obj = case CommonValBin of new_obj -> CVB = ?COMMMON_VAL_INIT, riak_object:new( - Bucket, Key, <>); + Bucket, to_key(N), <>); UpdateBin -> UPDV = <>, - {ok, PrevObj} = riak_client:get(Bucket, Key, C), + {ok, PrevObj} = riak_client:get(Bucket, to_key(N), C), riak_object:update_value(PrevObj, UPDV) end, try riak_client:put(Obj, C) of From 03aa878dd330243a003cb88a7ace2e4fa4134c20 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 14 Sep 2023 21:24:17 +0100 Subject: [PATCH 8/8] Update rebar.config --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index b19acbe9c..f1dc9b1d1 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ {getopt, ".*", {git, "https://github.com/jcomellas/getopt", {tag, "v1.0.1"}}}, {meck, {git, "https://github.com/eproxus/meck.git", {tag, "0.8.13"}}}, {mapred_verify, ".*", {git, "https://github.com/nhs-riak/mapred_verify", {branch, "nhse-develop-3.0"}}}, - {riakhttpc, ".*", {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-d30-nhskv5"}}}, + {riakhttpc, ".*", {git, "https://github.com/nhs-riak/riak-erlang-http-client", {branch, "nhse-develop-3.0"}}}, {kvc, "1.7.0", {git, "https://github.com/etrepum/kvc", {tag, "v1.7.0"}}}, {kv_index_tictactree, ".*", {git, "https://github.com/nhs-riak/kv_index_tictactree.git", {branch, "nhse-develop-3.0"}}} ]}.