Skip to content

Commit

Permalink
[#38] Improve test coverage (100%) and general enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Jan 9, 2018
1 parent f18a6b5 commit 1603e56
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 283 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ Therefore, one of the most common and proven strategies to deal with these probl
is [Sharding/Partitioning](https://en.wikipedia.org/wiki/Partition_(database))
the principle is pretty similar to [DHTs](https://en.wikipedia.org/wiki/Distributed_hash_table).

Here is where **Shards** comes in. **Shards** makes it extremely easy to achieve
all this, with **zero** effort. It provides an API compatible with [ETS](http://erlang.org/doc/man/ets.html) – with few exceptions. You can check
Here is where **Shards** comes in. **Shards** makes very easy to achieve
all these features with **zero** effort. It provides an API compatible with
[ETS](http://erlang.org/doc/man/ets.html) – with few exceptions. You can check
the list of compatible ETS functions that **Shards** provides [HERE](https://github.com/cabol/shards/issues/1).


## Usage
## Installation

### Erlang

Expand Down Expand Up @@ -421,7 +422,9 @@ And again, let's check it out from any node:
basic functions have been implemented.


## Examples and/or Projects using Shards
## Important links

* [Blog Post – including load tests results](http://cabol.github.io/posts/2016/04/14/sharding-support-for-ets.html).

* [ExShards](https://github.com/cabol/ex_shards) – Elixir wrapper for
`shards` with extra and nicer functions.
Expand All @@ -432,9 +435,6 @@ And again, let's check it out from any node:
* [KVX](https://github.com/cabol/kvx) – Simple/basic Elixir in-memory Key/Value
Store using `shards` (default adapter).

* [ErlBus](https://github.com/cabol/erlbus) uses `shards` to scale-out
Topics/Pids table(s), which can be too large and with high concurrency level.

* [Cacherl](https://github.com/ferigis/cacherl) uses `shards` to implement a
Distributed Cache.

Expand All @@ -445,8 +445,6 @@ And again, let's check it out from any node:

You can find tests results in `_build/test/logs`, and coverage in `_build/test/cover`.

> **NOTE:** You can find some performance tests in this [BLOG POST](http://cabol.github.io/posts/2016/04/14/sharding-support-for-ets.html).

## Building Edoc

Expand Down
10 changes: 10 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

{cover_enabled, true}.

{cover_opts, [verbose]}.

%% == Common Test ==

{ct_compile_opts, [
Expand Down Expand Up @@ -54,6 +56,14 @@
{deps, [
{jchash, "0.1.0"},
{mixer, {git, "https://github.com/chef/mixer.git", {branch, "master"}}}
]},
{cover_excl_mods, [
shards_test_helpers,
shards_task_SUITE,
shards_state_SUITE,
shards_local_SUITE,
shards_lib_SUITE,
shards_dist_SUITE
]}
]}
]}.
Expand Down
9 changes: 6 additions & 3 deletions src/shards.app.src
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
{application, shards, [
{description, "ETS with Sharding support."},
{vsn, "0.5.0"},
{description, "Sharding support for ETS tables out-of-box"},
{vsn, "0.5.1"},
{registered, []},
{mod, {shards_app, []}},
{applications, [kernel, stdlib]},
{applications, [
kernel,
stdlib
]},
{env,[]},
{modules, []},
{maintainers, ["Carlos A Bolanos"]},
Expand Down
27 changes: 19 additions & 8 deletions src/shards_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,43 @@
%%% Extended API
%%%===================================================================

-spec join(Tab, Nodes) -> JoinedNodes when
Tab :: atom(),
Nodes :: [node()],
JoinedNodes :: [node()].
-spec join(Tab, Nodes) -> CurrentNodes when
Tab :: atom(),
Nodes :: [node()],
CurrentNodes :: [node()].
join(Tab, Nodes) ->
FilteredNodes =
lists:filter(fun(Node) ->
not lists:member(Node, get_nodes(Tab))
end, Nodes),

Fun =
fun() ->
rpc:multicall(FilteredNodes, erlang, apply, [fun join_/1, [Tab]])
end,

_ = global:trans({?MODULE, Tab}, Fun),
get_nodes(Tab).

%% @private
join_(Tab) ->
pg2:join(Tab, shards_lib:get_pid(Tab)).

-spec leave(Tab, Nodes) -> LeavedNodes when
Tab :: atom(),
Nodes :: [node()],
LeavedNodes :: [node()].
-spec leave(Tab, Nodes) -> CurrentNodes when
Tab :: atom(),
Nodes :: [node()],
CurrentNodes :: [node()].
leave(Tab, Nodes) ->
Members = [{node(Pid), Pid} || Pid <- pg2:get_members(Tab)],

ok =
lists:foreach(fun(Node) ->
case lists:keyfind(Node, 1, Members) of
{Node, Pid} -> pg2:leave(Tab, Pid);
_ -> noop
end
end, Nodes),

get_nodes(Tab).

-spec get_nodes(Tab) -> Nodes when
Expand Down Expand Up @@ -155,6 +159,7 @@ file2tab(Filename, Options) when ?is_filename(Filename) ->
StrFilename = shards_lib:to_string(Filename),
try
{Tab, Nodes} = tabfile_info_local(StrFilename),

Res =
shards_lib:reduce_while(fun(Node, Acc) ->
NodeFilename = shards_lib:to_string(Node) ++ "." ++ StrFilename,
Expand All @@ -166,6 +171,7 @@ file2tab(Filename, Options) when ?is_filename(Filename) ->
{halt, E}
end
end, [], Nodes),

case Res of
{error, _} = ResErr ->
ResErr;
Expand Down Expand Up @@ -291,11 +297,13 @@ lookup_element(Tab, Key, Pos, State) ->
case pick_node(PickNodeFun, Key, Nodes, r) of
any ->
Map = {?SHARDS, lookup_element, [Tab, Key, Pos, State]},

Filter =
lists:filter(fun
({badrpc, {'EXIT', _}}) -> false;
(_) -> true
end, mapred(Tab, Map, nil, State, r)),

case Filter of
[] -> error(badarg);
_ -> lists:append(Filter)
Expand Down Expand Up @@ -459,6 +467,7 @@ tabfile_info(Filename) when ?is_filename(Filename) ->
StrFilename = shards_lib:to_string(Filename),
try
{_Tab, Nodes} = tabfile_info_local(StrFilename),

TabInfoList =
shards_lib:reduce_while(fun(Node, Acc) ->
NodeFilename = shards_lib:to_string(Node) ++ "." ++ StrFilename,
Expand All @@ -469,6 +478,7 @@ tabfile_info(Filename) when ?is_filename(Filename) ->
{halt, E}
end
end, [], Nodes),

case TabInfoList of
{error, _} = ResErr ->
ResErr;
Expand Down Expand Up @@ -567,6 +577,7 @@ p_mapred(Tab, {MapMod, MapFun, MapArgs}, {RedFun, AccIn}) ->
rpc:call(Node, MapMod, MapFun, MapArgs)
end), [AsyncTask | Acc]
end, [], get_nodes(Tab)),

lists:foldl(fun(Task, Acc) ->
MapRes = shards_task:await(Task),
RedFun(MapRes, Acc)
Expand Down
2 changes: 2 additions & 0 deletions src/shards_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ iterator(StateOrNumShards) ->
false when is_integer(StateOrNumShards) ->
StateOrNumShards
end,

lists:seq(0, N - 1).

%% @doc
Expand Down Expand Up @@ -147,6 +148,7 @@ keyupdate(Fun, Keys, Init, KVList1) when is_function(Fun, 2) ->
{Key, Value} -> {Key, Fun(Key, Value)};
false -> {Key, Init}
end,

lists:keystore(Key, 1, Acc, NewKV)
end, KVList1, Keys).

Expand Down
92 changes: 53 additions & 39 deletions src/shards_local.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@
%%% Types & Macros
%%%===================================================================

%% ETS Types
-type access() :: public | protected | private.
-type tab() :: atom().
-type type() :: set | ordered_set | bag | duplicate_bag.
-type cont() :: '$end_of_table'
| {tab(), integer(), integer(), ets:comp_match_spec(), list(), integer()}
| {tab(), _, _, integer(), ets:comp_match_spec(), list(), integer(), integer()}.

%% @type tweaks() = {write_concurrency, boolean()}
%% | {read_concurrency, boolean()}
%% | compressed.
Expand All @@ -131,14 +139,14 @@
| {pick_node_fun, shards_state:pick_fun()}
| {restart_strategy, one_for_one | one_for_all}.

%% @type option() = ets:type() | ets:access() | named_table
%% @type option() = type() | access() | named_table
%% | {keypos, pos_integer()}
%% | {heir, pid(), HeirData :: term()}
%% | {heir, none} | tweaks()
%% | shards_opt().
%%
%% Create table options – used by `new/2'.
-type option() :: ets:type() | ets:access() | named_table
-type option() :: type() | access() | named_table
| {keypos, pos_integer()}
| {heir, pid(), HeirData :: term()}
| {heir, none} | tweaks()
Expand All @@ -153,9 +161,9 @@
| {named_table, boolean()}
| {node, node()}
| {owner, pid()}
| {protection, ets:access()}
| {protection, access()}
| {size, non_neg_integer()}
| {type, ets:type()}
| {type, type()}
| {write_concurrency, boolean()}
| {read_concurrency, boolean()}
| {shards, [atom()]}.
Expand All @@ -169,8 +177,8 @@

%% ETS TabInfo Item
-type tabinfo_item() :: {name, atom()}
| {type, ets:type()}
| {protection, ets:access()}
| {type, type()}
| {protection, access()}
| {named_table, boolean()}
| {keypos, non_neg_integer()}
| {size, non_neg_integer()}
Expand All @@ -183,7 +191,7 @@
%% MatchSpec :: ets:match_spec(),
%% Limit :: pos_integer(),
%% Shard :: non_neg_integer(),
%% Continuation :: ets:continuation()
%% Continuation :: cont()
%% }.
%%
%% Defines the convention to `ets:select/1,3' continuation:
Expand All @@ -199,7 +207,7 @@
MatchSpec :: ets:match_spec(),
Limit :: pos_integer(),
Shard :: non_neg_integer(),
Continuation :: ets:continuation()
Continuation :: cont()
}.

%% @type filename() = string() | binary() | atom().
Expand Down Expand Up @@ -619,29 +627,32 @@ lookup_element(Tab, Key, Pos) ->
lookup_element(Tab, Key, Pos, State) ->
N = shards_state:n_shards(State),
PickShardFun = shards_state:pick_shard_fun(State),
case PickShardFun(Key, N, r) of
any ->
LookupElem =
fun(Tx, Kx, Px) ->
try
ets:lookup_element(Tx, Kx, Px)
catch
error:badarg -> {error, notfound}
end
end,
Filter =
lists:filter(fun
({error, notfound}) -> false;
(_) -> true
end, mapred(Tab, {LookupElem, [Key, Pos]}, State)),
case Filter of
[] -> error(badarg);
_ -> lists:append(Filter)
end;
Shard ->
ShardName = shards_lib:shard_name(Tab, Shard),
ets:lookup_element(ShardName, Key, Pos)
end.
lookup_element(Tab, PickShardFun(Key, N, r), Key, Pos, State).

%% @private
lookup_element(Tab, any, Key, Pos, State) ->
LookupElem =
fun(Tx, Kx, Px) ->
try
ets:lookup_element(Tx, Kx, Px)
catch
error:badarg -> {error, notfound}
end
end,

Filter =
lists:filter(fun
({error, notfound}) -> false;
(_) -> true
end, mapred(Tab, {LookupElem, [Key, Pos]}, State)),

case Filter of
[] -> error(badarg);
_ -> lists:append(Filter)
end;
lookup_element(Tab, Shard, Key, Pos, _State) ->
ShardName = shards_lib:shard_name(Tab, Shard),
ets:lookup_element(ShardName, Key, Pos).

match(Tab, Pattern) ->
match(Tab, Pattern, shards_state:new()).
Expand Down Expand Up @@ -941,12 +952,11 @@ rename(Tab, Name) ->
Name :: atom(),
State :: shards_state:state().
rename(Tab, Name, State) ->
ok =
lists:foreach(fun(Shard) ->
ShardName = shards_lib:shard_name(Tab, Shard),
NewShardName = shards_lib:shard_name(Name, Shard),
NewShardName = do_rename(ShardName, NewShardName)
end, shards_lib:iterator(State)),
ok = lists:foreach(fun(Shard) ->
ShardName = shards_lib:shard_name(Tab, Shard),
NewShardName = shards_lib:shard_name(Name, Shard),
NewShardName = do_rename(ShardName, NewShardName)
end, shards_lib:iterator(State)),
do_rename(Tab, Name).

%% @private
Expand Down Expand Up @@ -1202,13 +1212,15 @@ tab2file(Tab, Filename, State) ->
Response :: ok | {error, Reason :: term()}.
tab2file(Tab, Filename, Options, State) when ?is_filename(Filename) ->
StrFilename = shards_lib:to_string(Filename),

{Nodes, NewOpts} =
case lists:keytake(nodes, 1, Options) of
{value, {nodes, Val}, Opts1} ->
{Val, Opts1};
_ ->
{[node()], Options}
end,

ShardFilenamePairs =
shards_lib:reduce_while(fun(Shard, Acc) ->
ShardName = shards_lib:shard_name(Tab, Shard),
Expand All @@ -1220,6 +1232,7 @@ tab2file(Tab, Filename, Options, State) when ?is_filename(Filename) ->
{halt, Error}
end
end, [], shards_lib:iterator(State)),

case ShardFilenamePairs of
{error, _} = Error ->
Error;
Expand Down Expand Up @@ -1433,6 +1446,7 @@ p_mapred(Tab, NumShards, {MapFun, Args}, {ReduceFun, AccIn}) ->
apply(MapFun, [shards_lib:shard_name(Tab, Shard) | Args])
end), [AsyncTask | Acc]
end, [], shards_lib:iterator(NumShards)),

lists:foldl(fun(Task, Acc) ->
MapRes = shards_task:await(Task),
ReduceFun(MapRes, Acc)
Expand All @@ -1448,8 +1462,8 @@ mapred_funs(MapFun, ReduceFun) ->
true -> {MapFun, []};
_ -> MapFun
end,
Reduce = {ReduceFun, []},
{Map, Reduce}.

{Map, {ReduceFun, []}}.

%% @private
fold(Tab, NumShards, Fold, [Fun, Acc]) ->
Expand Down
Loading

0 comments on commit 1603e56

Please sign in to comment.