Skip to content

Commit

Permalink
add fns and test to get active preflist w/ partition nums, annotated,…
Browse files Browse the repository at this point in the history
… by bucket-key
  • Loading branch information
zeeshanlakhani committed Feb 23, 2015
1 parent 7898729 commit ae49081
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 71 deletions.
225 changes: 184 additions & 41 deletions src/riak_core_apl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
%% -------------------------------------------------------------------
-module(riak_core_apl).
-export([active_owners/1, active_owners/2,
get_apl/3, get_apl/4, get_apl_ann/3, get_apl_ann/4,
get_apl/3, get_apl/4,
get_apl_ann/2, get_apl_ann/3, get_apl_ann/4,
get_apl_ann_with_pnum/1,
get_primary_apl/3, get_primary_apl/4,
get_primary_apl_chbin/4,
first_up/2, offline_owners/1, offline_owners/2
]).

-export_type([preflist/0, preflist_ann/0]).
-export_type([preflist/0, preflist_ann/0, preflist_with_pnum_ann/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
Expand All @@ -40,13 +43,18 @@
-type ring() :: riak_core_ring:riak_core_ring().
-type preflist() :: [{index(), node()}].
-type preflist_ann() :: [{{index(), node()}, primary|fallback}].
%% @type preflist_with_pnum_ann -
%% Annoated preflist where the partition value is an id/number
%% (0 to ring_size-1) instead of a hash.
-type preflist_with_pnum_ann() :: [{{ring_core_ring:partition_id(), node()},
primary|fallback}].
-type iterator() :: term().
-type chashbin() :: term().
-type docidx() :: chash:index().

%% Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size
%% @doc Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
%% preflist with N=ring_size.
-spec active_owners(atom()) -> preflist_ann().
active_owners(Service) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Expand All @@ -59,62 +67,81 @@ active_owners(Ring, UpNodes) ->
{Up, _Pangs} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Get the active preflist taking account of which nodes are up
%% @doc Get the active preflist taking account of which nodes are up.
-spec get_apl(docidx(), n_val(), atom()) -> preflist().
get_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list.
-spec get_apl_chbin(docidx(), n_val(), chashbin:chashbin(), [node()]) -> preflist().
get_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list.
-spec get_apl(docidx(), n_val(), ring(), [node()]) -> preflist().
get_apl(DocIdx, N, Ring, UpNodes) ->
[{Partition, Node} || {{Partition, Node}, _Type} <-
[{Partition, Node} || {{Partition, Node}, _Type} <-
get_apl_ann(DocIdx, N, Ring, UpNodes)].

%% Get the active preflist taking account of which nodes are up
%% and annotate each node with type of primary/fallback
%% @doc Get the active preflist taking account of which nodes are up for a given
%% chash/upnodes list and annotate each node with type of primary/fallback.
get_apl_ann(DocIdx, N, UpNodes) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes).

%% Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback
%% @doc Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_apl_ann(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Preflist = riak_core_ring:preflist(DocIdx, Ring),

{Primaries, Fallbacks} = lists:split(N, Preflist),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks(Pangs, Fallbacks, UpNodes1, []).

%% Same as get_apl, but returns only the primaries.
%% @doc Get the active preflist for a given {bucket, key} and list of nodes
%% and annotate each node with type of primary/fallback.
-spec get_apl_ann(riak_core_bucket:bucket(), [node()]) -> preflist_ann().
get_apl_ann({Bucket, Key}, UpNodes) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
NVal = proplists:get_value(n_val, BucketProps),
DocIdx = riak_core_util:chash_key({Bucket, Key}),
get_apl_ann(DocIdx, NVal, UpNodes).

%% @doc Get the active preflist taking account of which nodes are up
%% for a given {bucket, key} and annotate each node with type of
%% primary/fallback
-spec get_apl_ann_with_pnum(riak_core_bucket:bucket()) -> preflist_with_pnum_ann().
get_apl_ann_with_pnum(BKey) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
UpNodes = riak_core_ring:all_members(Ring),
Apl = get_apl_ann(BKey, UpNodes),
Size = riak_core_ring:num_partitions(Ring),
apl_with_partition_nums(Apl, Size).

%% @doc Get the active preflist taking account of which nodes are up
%% for a given chash/upnodes list and annotate each node with type of
%% primary/fallback.
-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Itr = chashbin:iterator(DocIdx, CHBin),
{Primaries, Itr2} = chashbin:itr_pop(N, Itr),
{Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).

%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), atom()) -> preflist_ann().
get_primary_apl(DocIdx, N, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
get_primary_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist_ann().
get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -123,7 +150,7 @@ get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Same as get_apl, but returns only the primaries.
%% @doc Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist_ann().
get_primary_apl(DocIdx, N, Ring, UpNodes) ->
UpNodes1 = UpNodes,
Expand All @@ -132,8 +159,8 @@ get_primary_apl(DocIdx, N, Ring, UpNodes) ->
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.

%% Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
%% @doc Return the first entry that is up in the preflist for `DocIdx'. This
%% will crash if all owning nodes are offline.
first_up(DocIdx, Service) ->
{ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
Itr = chashbin:iterator(DocIdx, CHBin),
Expand All @@ -154,7 +181,7 @@ offline_owners(Service, CHBin) ->
end, CHBin),
DownVNodes.

%% Split a preference list into up and down lists
%% @doc Split a preference list into up and down lists.
-spec check_up(preflist(), [node()], preflist_ann(), preflist()) -> {preflist_ann(), preflist()}.
check_up([], _UpNodes, Up, Pangs) ->
{lists:reverse(Up), lists:reverse(Pangs)};
Expand All @@ -166,7 +193,7 @@ check_up([{Partition,Node}|Rest], UpNodes, Up, Pangs) ->
check_up(Rest, UpNodes, Up, [{Partition, Node} | Pangs])
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks(preflist(), preflist(), [node()], preflist_ann()) -> preflist_ann().
find_fallbacks(_Pangs, [], _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -181,7 +208,7 @@ find_fallbacks([{Partition, _Node}|Rest]=Pangs, [{_,FN}|Fallbacks], UpNodes, Sec
find_fallbacks(Pangs, Fallbacks, UpNodes, Secondaries)
end.

%% Find fallbacks for downed nodes in the preference list
%% @doc Find fallbacks for downed nodes in the preference list.
-spec find_fallbacks_chbin(preflist(), iterator(),[node()], preflist_ann()) -> preflist_ann().
find_fallbacks_chbin([], _Fallbacks, _UpNodes, Secondaries) ->
lists:reverse(Secondaries);
Expand All @@ -198,10 +225,17 @@ find_fallbacks_chbin([{Partition, _Node}|Rest]=Pangs, Itr, UpNodes, Secondaries)
find_fallbacks_chbin(Pangs, Itr2, UpNodes, Secondaries)
end.

%% Return true if a node is up
%% @doc Return true if a node is up.
is_up(Node, UpNodes) ->
lists:member(Node, UpNodes).

%% @doc Return annotated preflist with partition ids/nums instead of hashes.
-spec apl_with_partition_nums(preflist_ann(), riak_core_ring:ring_size()) ->
preflist_with_pnum_ann().
apl_with_partition_nums(Apl, Size) ->
[{{riak_core_ring_util:hash_to_partition_id(Hash, Size), Node}, Ann} ||
{{Hash, Node}, Ann} <- Apl].

-ifdef(TEST).

smallest_test() ->
Expand All @@ -213,12 +247,12 @@ four_node_test() ->
Ring = perfect_ring(8, Nodes),
?assertEqual([{0,nodea},
{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec}],
{365375409332725729550921208179070754913983135744,nodec}],
get_apl(last_in_ring(), 3, Ring, Nodes)),
%% With a node down
?assertEqual([{182687704666362864775460604089535377456991567872,nodeb},
{365375409332725729550921208179070754913983135744,nodec},
{0,noded}],
{0,noded}],
get_apl(last_in_ring(), 3, Ring, [nodeb, nodec, noded])),
%% With two nodes down
?assertEqual([{365375409332725729550921208179070754913983135744,nodec},
Expand All @@ -231,8 +265,7 @@ four_node_test() ->
{365375409332725729550921208179070754913983135744,nodea}],
get_apl(last_in_ring(), 3, Ring, [nodea, nodeb])).


%% Create a perfect ring - RingSize must be a multiple of nodes
%% @doc Create a perfect ring - RingSize must be a multiple of nodes
perfect_ring(RingSize, Nodes) when RingSize rem length(Nodes) =:= 0 ->
Ring = riak_core_ring:fresh(RingSize,node()),
Owners = riak_core_ring:all_owners(Ring),
Expand Down Expand Up @@ -326,6 +359,116 @@ six_node_test() ->

ok.

six_node_bucket_key_ann_test() ->
{ok, [Ring0]} = file:consult("../test/my_ring"),
Nodes = ['[email protected]', '[email protected]', '[email protected]',
'[email protected]', '[email protected]', '[email protected]'],
Ring = riak_core_ring:upgrade(Ring0),
Bucket = <<"favorite">>,
Key = <<"jethrotull">>,
application:set_env(riak_core, default_bucket_props,
[{n_val, 3},
{chash_keyfun,{riak_core_util,chash_std_keyfun}}]),
riak_core_ring_manager:setup_ets(test),
riak_core_ring_manager:set_ring_global(Ring),
Size = riak_core_ring:num_partitions(Ring),
?assertEqual([{{34,
'[email protected]'},
primary},
{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
primary}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes), Size)),
?assertEqual([{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
primary},
{{34,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]']), Size)),
?assertEqual([{{36,
'[email protected]'},
primary},
{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
fallback},
{{35,
'[email protected]'},
fallback},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]',
'[email protected]',
'[email protected]',
'[email protected]']), Size)),
?assertEqual([{{34,
'[email protected]'},
primary},
{{35,
'[email protected]'},
primary},
{{36,
'[email protected]'},
fallback}],
apl_with_partition_nums(
get_apl_ann({Bucket, Key}, Nodes --
['[email protected]',
'[email protected]']), Size)),
riak_core_ring_manager:cleanup_ets(test),
ok.

chbin_test_() ->
{timeout, 180, fun chbin_test_scenario/0}.

Expand Down
6 changes: 5 additions & 1 deletion src/riak_core_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@
name/1,
n_val/1]).

-export_type([bucket/0]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-define(METADATA_PREFIX, {core, buckets}).

-type bucket() :: binary() | {riak_core_bucket_type:bucket_type(), binary()}.

%% @doc Add a list of defaults to global list of defaults for new
%% buckets. If any item is in Items is already set in the
%% current defaults list, the new setting is omitted, and the old
Expand All @@ -55,7 +59,7 @@ append_bucket_defaults(Items) when is_list(Items) ->

%% @doc Set the given BucketProps in Bucket or {BucketType, Bucket}. If BucketType does not
%% exist, or is not active, {error, no_type} is returned.
-spec set_bucket(binary() | {riak_core_bucket_type:bucket_type(), binary()}, [{atom(), any()}]) ->
-spec set_bucket(bucket(), [{atom(), any()}]) ->
ok | {error, no_type | [{atom(), atom()}]}.
set_bucket({<<"default">>, Name}, BucketProps) ->
set_bucket(Name, BucketProps);
Expand Down
Loading

0 comments on commit ae49081

Please sign in to comment.