Skip to content

Commit

Permalink
add fns and test to get active preflist w/ partition nums, annotated,…
Browse files Browse the repository at this point in the history
… by bucket-key
  • Loading branch information
zeeshanlakhani committed Feb 18, 2015
1 parent 7898729 commit b4807c5
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 69 deletions.
224 changes: 184 additions & 40 deletions src/riak_core_apl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
%% -------------------------------------------------------------------
-module(riak_core_apl).
-export([active_owners/1, active_owners/2,
get_apl/3, get_apl/4, get_apl_ann/3, get_apl_ann/4,
get_apl/3, get_apl/4,
get_apl_ann/2, get_apl_ann/3, get_apl_ann/4,
get_apl_ann_with_pnum/1,
get_primary_apl/3, get_primary_apl/4,
get_primary_apl_chbin/4,
first_up/2, offline_owners/1, offline_owners/2
]).

-export_type([preflist/0, preflist_ann/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Expand All @@ -40,13 +43,19 @@
-type ring() :: riak_core_ring:riak_core_ring().
-type preflist() :: [{index(), node()}].
-type preflist_ann() :: [{{index(), node()}, primary|fallback}].
%% @type preflist_with_pnum_ann -
%% Annoated preflist where the partition value is an id/number
%% (0 to ring_size-1) instead of a hash.
-type preflist_with_pnum_ann() :: [{{partition_id(), node()}, primary|fallback}].
-type iterator() :: term().
-type chashbin() :: term().
-type docidx() :: chash:index().
-type partition_id() :: non_neg_integer().
-type ring_size() :: non_neg_integer().

%% Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size
%% @doc Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size.
-spec active_owners(atom()) -> preflist_ann().
active_owners(Service) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Expand All @@ -59,62 +68,81 @@ active_owners(Ring, UpNodes) ->
{Up, _Pangs} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Get the active preflist taking account of which nodes are up
%% @doc Get the active preflist taking account of which nodes are up.
-spec get_apl(docidx(), n_val(), atom()) -> preflist().
get_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list.
-spec get_apl_chbin(docidx(), n_val(), chashbin:chashbin(), [node()]) -> preflist().
get_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list.
-spec get_apl(docidx(), n_val(), ring(), [node()]) -> preflist().
get_apl(DocIdx, N, Ring, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann(DocIdx, N, Ring, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% and annotate each node with type of primary/fallback
%% @doc Get the active preflist taking account of which nodes are up for a given
%% chash/upnodes list and annotate each node with type of primary/fallback.
get_apl_ann(DocIdx, N, UpNodes) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_apl_ann(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Preflist = riak_core_ring:preflist(DocIdx, Ring),

{Primaries, Fallbacks} = lists:split(N, Preflist),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks(Pangs, Fallbacks, UpNodes1, []).

%% Same as get_apl, but returns only the primaries.
%% @doc Get the active preflist for a given {bucket, key} and list of nodes
%% and annotate each node with type of primary/fallback.
-spec get_apl_ann({binary(), binary()}, [node()]) -> preflist_ann().
get_apl_ann({Bucket, Key}, UpNodes) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
DocIdx = riak_core_util:chash_key({Bucket, Key}),
get_apl_ann(DocIdx, NVal, UpNodes).

%% @doc Get the active preflist taking account of which nodes are up
%% for a given {bucket, key} and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_with_pnum({binary(), binary()}) -> preflist_with_pnum_ann().
get_apl_ann_with_pnum(BKey) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
UpNodes = riak_core_ring:all_members(Ring),
Apl = get_apl_ann(BKey, UpNodes),
Size = riak_core_ring:num_partitions(Ring),
apl_with_partition_nums(Apl, Size).

%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), atom()) -> preflist_ann().
get_primary_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_primary_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -123,7 +151,7 @@ get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_primary_apl(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -132,8 +160,8 @@ get_primary_apl(DocIdx, N, Ring, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
%% @doc Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
first_up(DocIdx, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
Itr = chashbin:iterator(DocIdx, CHBin),
Expand All @@ -154,7 +182,7 @@ offline_owners(Service, CHBin) ->
end, CHBin),
DownVNodes.

%% Split a preference list into up and down lists
%% @doc Split a preference list into up and down lists.
-spec check_up(preflist(), [node()], preflist_ann(), preflist()) -> {preflist_ann(), preflist()}.
check_up([], _UpNodes, Up, Pangs) ->
{lists:reverse(Up), lists:reverse(Pangs)};
Expand All @@ -166,7 +194,7 @@ check_up([{Partition,Node}|Rest], UpNodes, Up, Pangs) ->
check_up(Rest, UpNodes, Up, [{Partition, Node} | Pangs])
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks(preflist(), preflist(), [node()], preflist_ann()) -> preflist_ann().
find_fallbacks(_Pangs, [], _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -181,7 +209,7 @@ find_fallbacks([{Partition, _Node}|Rest]=Pangs, [{_,FN}|Fallbacks], UpNodes, Sec
find_fallbacks(Pangs, Fallbacks, UpNodes, Secondaries)
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks_chbin(preflist(), iterator(),[node()], preflist_ann()) -> preflist_ann().
find_fallbacks_chbin([], _Fallbacks, _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -198,10 +226,17 @@ find_fallbacks_chbin([{Partition, _Node}|Rest]=Pangs, Itr, UpNodes, Secondaries)
find_fallbacks_chbin(Pangs, Itr2, UpNodes, Secondaries)
end.

%% Return true if a node is up
%% @doc Return true if a node is up.
is_up(Node, UpNodes) ->
lists:member(Node, UpNodes).

%% @doc Return annotated preflist with partition ids/nums instead of hashes.
-spec apl_with_partition_nums(preflist_ann(), ring_size()) ->
preflist_with_pnum_ann().
apl_with_partition_nums(Apl, Size) ->
[{{riak_core_ring_util:hash_to_partition_id(Hash, Size), Node}, Ann} ||
{{Hash, Node}, Ann} <- Apl].

-ifdef(TEST).

smallest_test() ->
Expand All @@ -213,12 +248,12 @@ four_node_test() ->
Ring = perfect_ring(8, Nodes),
?assertEqual([{0,nodea},
{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec}],
{365375409332725729550921208179070754913983135744,nodec}],
get_apl(last_in_ring(), 3, Ring, Nodes)),
%% With a node down
?assertEqual([{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec},
{0,noded}],
{0,noded}],
get_apl(last_in_ring(), 3, Ring, [nodeb, nodec, noded])),
%% With two nodes down
?assertEqual([{365375409332725729550921208179070754913983135744,nodec},
Expand All @@ -231,8 +266,7 @@ four_node_test() ->
{365375409332725729550921208179070754913983135744,nodea}],
get_apl(last_in_ring(), 3, Ring, [nodea, nodeb])).


%% Create a perfect ring - RingSize must be a multiple of nodes
%% @doc Create a perfect ring - RingSize must be a multiple of nodes
perfect_ring(RingSize, Nodes) when RingSize rem length(Nodes) =:= 0 ->
Ring = riak_core_ring:fresh(RingSize,node()),
Owners = riak_core_ring:all_owners(Ring),
Expand Down Expand Up @@ -326,6 +360,116 @@ six_node_test() ->

ok.

six_node_bucket_key_ann_test() ->
{ok, [Ring0]} = file:consult("../test/my_ring"),
Nodes = ['[email protected]', '[email protected]', '[email protected]',
'[email protected]', '[email protected]', '[email protected]'],
Ring = riak_core_ring:upgrade(Ring0),
Bucket = <<"favorite">>,
Key = <<"jethrotull">>,
application:set_env(riak_core, default_bucket_props,
[{n_val, 3},
{chash_keyfun,{riak_core_util,chash_std_keyfun}}]),
riak_core_ring_manager:setup_ets(test),
riak_core_ring_manager:set_ring_global(Ring),
Size = riak_core_ring:num_partitions(Ring),
?assertEqual([{{34,
'[email protected]'},
primary},
{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
primary}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes), Size)),
?assertEqual([{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
primary},
{{34,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]']), Size)),
?assertEqual([{{36,
'[email protected]'},
primary},
{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
primary},
{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]']), Size)),
riak_core_ring_manager:cleanup_ets(test),
ok.

chbin_test_() ->
{timeout, 180, fun chbin_test_scenario/0}.

Expand Down
Loading

0 comments on commit b4807c5

Please sign in to comment.