Skip to content

Commit

Permalink
Merge pull request #17 from cabol/v0.2.0
Browse files Browse the repository at this point in the history
Preparing v0.2.0.
  • Loading branch information
cabol authored Jun 16, 2016
2 parents f612ca0 + 3349d09 commit 233c7eb
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 95 deletions.
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
REBAR = $(shell which rebar3)

## Common variables
CONFIG ?= test/test.config
DEFAULT_PATH = ./_build/default
DEFAULT_BUILD_PATH = $(DEFAULT_PATH)/lib/*/ebin

## CT
CT_PATH = ./_build/test
CT_BUILD_PATH = $(CT_PATH)/lib/*/ebin
CT_SUITES = task_SUITE local_SUITE dist_SUITE
CT_OPTS = -cover test/cover.spec -erl_args -config ${CONFIG}
CT_OPTS = -cover test/cover.spec

.PHONY: all check_rebar compile clean distclean dialyze tests shell doc

Expand Down Expand Up @@ -46,8 +45,8 @@ tests: check_rebar
ct_run -dir test -suite $(CT_SUITES) -pa $(CT_BUILD_PATH) -logdir $(CT_PATH)/logs $(CT_OPTS)
rm -rf test/*.beam

shell: compile
erl -pa $(DEFAULT_BUILD_PATH) -s shards -config ${CONFIG}
shell:
$(REBAR) shell

edoc:
$(REBAR) edoc
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ the options. With `shards` there are additional options:
* `{restart_strategy, one_for_one | one_for_all}`: allows to configure the restart strategy for
`shards_owner_sup`. Default is `one_for_one`.

* `{auto_eject_nodes, boolean()}`: A boolean value that controls if node should be ejected
when it fails. – Default is `true`.

* `{pick_shard_fun, pick_shard_fun()}`: Function to pick the **shard** on which the `key`
will be handled locally – used by `shards_local`. See the spec [HERE](https://github.com/cabol/shards/blob/master/src/shards_local.erl#L145).

Expand All @@ -78,10 +81,10 @@ Besides, the `shards:new/2` function returns a tuple of two elements:
```

The first element is the name of the created table; `mytab1`. And the second one is the
[State](./src/shards_local.erl#L159): `{4, #Fun<shards_local.pick_shard.3>, set}`.
[State](./src/shards_local.erl#L189-L205): `{4, #Fun<shards_local.pick_shard.3>, set}`.
We'll talk about the **State** later, and see how it can be used.

> **NOTE:** For more information about `shards:new/2` go [HERE](./src/shards_local.erl#L796).
> **NOTE:** For more information about `shards:new/2` see [shards](./src/shards.erl).
Let's continue:

Expand Down Expand Up @@ -166,13 +169,13 @@ Extremely simple isn't?

The module `shards` is a wrapper on top of two main modules:

* `shards_local`: Implements Sharding on top of ETS tables, but locally (on a single Erlang node).
* `shards_dist`: Implements Sharding but across multiple distributed Erlang nodes, which must
* [shards_local](./src/shards_local.erl): Implements Sharding on top of ETS tables, but locally (on a single Erlang node).
* [shards_dist](./src/shards_dist.erl): Implements Sharding but across multiple distributed Erlang nodes, which must
run `shards` locally, since `shards_dist` uses `shards_local` internally. We'll cover
the distributed part later.

When you use `shards` on top of `shards_local`, a call to the control ETS table owned by `shards_owner_sup`
must be done, in order to recover the [State](./src/shards_local.erl#L159), mentioned previously.
must be done, in order to recover the [State](./src/shards_local.erl#L189-L205), mentioned previously.
Most of the `shards_local` functions receives the **State** as parameter, so it must be fetched before
to call it. You can check how `shards` module is implemented [HERE](./src/shards.erl).

Expand Down Expand Up @@ -234,7 +237,8 @@ $ erl -sname c@localhost -pa _build/default/lib/*/ebin -s shards
% when a tables is created with {scope, g}, the module shards_dist is used
% internally by shards
> shards:new(mytab, [{n_shards, 4}, {scope, g}]).
{mytab,{4,#Fun<shards_local.pick_shard.3>,set}}
{mytab,{{4,#Fun<shards_local.pick_shard.3>,set},
{#Fun<shards_dist.pick_node.3>,true}}}
```

**3.** Setup the `shards` cluster.
Expand Down
4 changes: 4 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@
{"(linux|darwin|solaris)", clean, "make -C c_src clean"},
{"(freebsd)", clean, "gmake -C c_src clean"}
]}.

%% == Shell ==

{shell, [{apps, [shards]}]}.
5 changes: 3 additions & 2 deletions src/shards.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, shards, [
{description, "ETS with Sharding support."},
{vsn, "0.1.0"},
{vsn, "0.2.0"},
{registered, []},
{mod, {shards, []}},
{applications, [kernel, stdlib]},
Expand All @@ -10,6 +10,7 @@
{licenses, ["MIT"]},
{links, [
{"GitHub", "https://github.com/cabol/shards"},
{"Docs", "http://cabol.github.io/posts/2016/04/14/sharding-support-for-ets.html"}
{"Doc", "http://cabol.github.io/shards"},
{"Blog Post", "http://cabol.github.io/posts/2016/04/14/sharding-support-for-ets.html"}
]}
]}.
58 changes: 35 additions & 23 deletions src/shards_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@
-type pick_node_fun() :: shards_local:pick_node_fun().

%% @type state() = {
%% PickNode :: pick_node_fun(),
%% AutoEject :: boolean()
%% PickNode :: pick_node_fun(),
%% AutoEjectNodes :: boolean()
%% }.
%%
%% Defines the `shards' distributed state:
%% <ul>
%% <li>`PickNode': Function callback to pick/compute the node.</li>
%% <li>`TableType': Table type.</li>
%% <li>`AutoEjectNodes': A boolean value that controls if node should be
%% ejected when it fails.</li>
%% </ul>
-type state() :: {
PickNode :: pick_node_fun(),
AutoEject :: boolean()
PickNode :: pick_node_fun(),
AutoEjectNodes :: boolean()
}.

-export_type([
Expand Down Expand Up @@ -125,7 +126,7 @@ pick_node(_, Key, Nodes) ->

-spec delete(Tab :: atom()) -> true.
delete(Tab) ->
mapred(Tab, {?SHARDS, delete, [Tab]}, nil, nil, delete),
mapred(Tab, {?SHARDS, delete, [Tab]}, nil, state(Tab), delete),
true.

-spec delete(Tab, Key, State) -> true when
Expand Down Expand Up @@ -163,10 +164,10 @@ insert(Tab, ObjOrObjL, State) when is_list(ObjOrObjL) ->
lists:foreach(fun(Object) ->
true = insert(Tab, Object, State)
end, ObjOrObjL), true;
insert(Tab, ObjOrObjL, {Local, {PickNode, _}}) when is_tuple(ObjOrObjL) ->
insert(Tab, ObjOrObjL, {Local, {PickNode, AutoEject}}) when is_tuple(ObjOrObjL) ->
[Key | _] = tuple_to_list(ObjOrObjL),
Node = PickNode(write, Key, get_nodes(Tab)),
rpc_call(Node, ?SHARDS, insert, [Tab, ObjOrObjL, Local]).
rpc_call(Node, {?SHARDS, insert, [Tab, ObjOrObjL, Local]}, Tab, AutoEject).

-spec insert_new(Tab, ObjOrObjL, State) -> Result when
Tab :: atom(),
Expand All @@ -177,7 +178,7 @@ insert_new(Tab, ObjOrObjL, State) when is_list(ObjOrObjL) ->
lists:foldr(fun(Object, Acc) ->
[insert_new(Tab, Object, State) | Acc]
end, [], ObjOrObjL);
insert_new(Tab, ObjOrObjL, {Local, {PickNode, _} = Dist}) when is_tuple(ObjOrObjL) ->
insert_new(Tab, ObjOrObjL, {Local, {PickNode, AutoEject} = Dist}) when is_tuple(ObjOrObjL) ->
[Key | _] = tuple_to_list(ObjOrObjL),
Nodes = get_nodes(Tab),
case PickNode(read, Key, Nodes) of
Expand All @@ -187,13 +188,13 @@ insert_new(Tab, ObjOrObjL, {Local, {PickNode, _} = Dist}) when is_tuple(ObjOrObj
case mapred(Tab, Map, Reduce, Dist, read) of
[] ->
Node = PickNode(write, Key, Nodes),
rpc_call(Node, ?SHARDS, insert_new, [Tab, ObjOrObjL, Local]);
rpc_call(Node, {?SHARDS, insert_new, [Tab, ObjOrObjL, Local]}, Tab, AutoEject);
_ ->
false
end;
_ ->
Node = PickNode(write, Key, Nodes),
rpc_call(Node, ?SHARDS, insert_new, [Tab, ObjOrObjL, Local])
rpc_call(Node, {?SHARDS, insert_new, [Tab, ObjOrObjL, Local]}, Tab, AutoEject)
end.

-spec lookup(Tab, Key, State) -> Result when
Expand Down Expand Up @@ -232,7 +233,7 @@ lookup_element(Tab, Key, Pos, {Local, {PickNode, _} = Dist}) ->
-spec match(Tab, Pattern, State) -> [Match] when
Tab :: atom(),
Pattern :: ets:match_pattern(),
State :: state(),
State :: shards:state(),
Match :: [term()].
match(Tab, Pattern, {Local, Dist}) ->
Map = {?SHARDS, match, [Tab, Pattern, Local]},
Expand All @@ -242,7 +243,7 @@ match(Tab, Pattern, {Local, Dist}) ->
-spec match_delete(Tab, Pattern, State) -> true when
Tab :: atom(),
Pattern :: ets:match_pattern(),
State :: state().
State :: shards:state().
match_delete(Tab, Pattern, {Local, Dist}) ->
Map = {?SHARDS, match_delete, [Tab, Pattern, Local]},
Reduce = {fun(Res, Acc) -> Acc and Res end, true},
Expand All @@ -251,7 +252,7 @@ match_delete(Tab, Pattern, {Local, Dist}) ->
-spec match_object(Tab, Pattern, State) -> [Object] when
Tab :: atom(),
Pattern :: ets:match_pattern(),
State :: state(),
State :: shards:state(),
Object :: tuple().
match_object(Tab, Pattern, {Local, Dist}) ->
Map = {?SHARDS, match_object, [Tab, Pattern, Local]},
Expand Down Expand Up @@ -282,7 +283,7 @@ new(Name, Options) ->
-spec select(Tab, MatchSpec, State) -> [Match] when
Tab :: atom(),
MatchSpec :: ets:match_spec(),
State :: state(),
State :: shards:state(),
Match :: term().
select(Tab, MatchSpec, {Local, Dist}) ->
Map = {?SHARDS, select, [Tab, MatchSpec, Local]},
Expand All @@ -292,7 +293,7 @@ select(Tab, MatchSpec, {Local, Dist}) ->
-spec select_count(Tab, MatchSpec, State) -> NumMatched when
Tab :: atom(),
MatchSpec :: ets:match_spec(),
State :: state(),
State :: shards:state(),
NumMatched :: non_neg_integer().
select_count(Tab, MatchSpec, {Local, Dist}) ->
Map = {?SHARDS, select_count, [Tab, MatchSpec, Local]},
Expand All @@ -302,7 +303,7 @@ select_count(Tab, MatchSpec, {Local, Dist}) ->
-spec select_delete(Tab, MatchSpec, State) -> NumDeleted when
Tab :: atom(),
MatchSpec :: ets:match_spec(),
State :: state(),
State :: shards:state(),
NumDeleted :: non_neg_integer().
select_delete(Tab, MatchSpec, {Local, Dist}) ->
Map = {?SHARDS, select_delete, [Tab, MatchSpec, Local]},
Expand All @@ -312,7 +313,7 @@ select_delete(Tab, MatchSpec, {Local, Dist}) ->
-spec select_reverse(Tab, MatchSpec, State) -> [Match] when
Tab :: atom(),
MatchSpec :: ets:match_spec(),
State :: state(),
State :: shards:state(),
Match :: term().
select_reverse(Tab, MatchSpec, {Local, Dist}) ->
Map = {?SHARDS, select_reverse, [Tab, MatchSpec, Local]},
Expand Down Expand Up @@ -351,12 +352,23 @@ state(TabName) ->
%%%===================================================================

%% @private
rpc_call(Node, Module, Function, Args) ->
rpc_call(Node, {Module, Function, Args}, Tab, AutoEject) ->
case rpc:call(Node, Module, Function, Args) of
{badrpc, _} -> throw(unexpected_error); % @TODO: call GC to remove this node
Response -> Response
{badrpc, _} ->
% unexpected to get here
maybe_eject_node(Node, Tab, AutoEject),
throw({unexpected_error, {badrpc, Node}});
Response ->
Response
end.

%% @private
maybe_eject_node(Node, Tab, true) ->
leave(Tab, [Node]),
ok;
maybe_eject_node(_, _, _) ->
ok.

%% @private
mapred(Tab, Map, Reduce, State, Op) ->
mapred(Tab, nil, Map, Reduce, State, Op).
Expand All @@ -366,12 +378,12 @@ mapred(Tab, Key, Map, nil, State, Op) ->
mapred(Tab, Key, Map, fun(E, Acc) -> [E | Acc] end, State, Op);
mapred(Tab, nil, Map, Reduce, _, _) ->
p_mapred(Tab, Map, Reduce);
mapred(Tab, Key, {MapMod, MapFun, MapArgs} = Map, Reduce, {PickNode, _}, Op) ->
mapred(Tab, Key, Map, Reduce, {PickNode, AutoEject}, Op) ->
case PickNode(Op, Key, get_nodes(Tab)) of
any ->
p_mapred(Tab, Map, Reduce);
Node ->
rpc_call(Node, MapMod, MapFun, MapArgs)
rpc_call(Node, Map, Tab, AutoEject)
end.

%% @private
Expand Down
80 changes: 43 additions & 37 deletions src/shards_local.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,43 @@
%% Defines spec function to pick or compute the node.
-type pick_node_fun() :: fun((operation_t(), key(), [node()]) -> node()) | any.

%% @type tweaks() = {write_concurrency, boolean()}
%% | {read_concurrency, boolean()}
%% | compressed.
%%
%% ETS tweaks option
-type tweaks() :: {write_concurrency, boolean()}
| {read_concurrency, boolean()}
| compressed.

%% @type shards_opt() = {scope, l | g}
%% | {n_shards, pos_integer()}
%% | {pick_shard_fun, pick_shard_fun()}
%% | {pick_node_fun, pick_node_fun()}
%% | {restart_strategy, one_for_one | one_for_all}
%% | {eject_nodes_on_failure, boolean()}.
%%
%% Shards extended options.
-type shards_opt() :: {scope, l | g}
| {n_shards, pos_integer()}
| {pick_shard_fun, pick_shard_fun()}
| {pick_node_fun, pick_node_fun()}
| {restart_strategy, one_for_one | one_for_all}
| {auto_eject_nodes, boolean()}.

%% @type option() = ets:type() | ets:access() | named_table
%% | {keypos, pos_integer()}
%% | {heir, pid(), HeirData :: term()}
%% | {heir, none} | tweaks()
%% | shards_opt().
%%
%% Create table options – used by `new/2'.
-type option() :: ets:type() | ets:access() | named_table
| {keypos, pos_integer()}
| {heir, pid(), HeirData :: term()}
| {heir, none} | tweaks()
| shards_opt().

%% @type state() = {
%% NumShards :: pos_integer(),
%% PickShard :: pick_shard_fun(),
Expand Down Expand Up @@ -191,41 +228,6 @@
Continuation :: ets:continuation()
}.

%% @type tweaks() = {write_concurrency, boolean()}
%% | {read_concurrency, boolean()}
%% | compressed.
%%
%% ETS tweaks option
-type tweaks() :: {write_concurrency, boolean()}
| {read_concurrency, boolean()}
| compressed.

%% @type shards_opt() = {scope, l | g}
%% | {n_shards, pos_integer()}
%% | {pick_shard_fun, pick_shard_fun()}
%% | {pick_node_fun, pick_node_fun()}
%% | {restart_strategy, one_for_one | one_for_all}.
%%
%% Shards extended options.
-type shards_opt() :: {scope, l | g}
| {n_shards, pos_integer()}
| {pick_shard_fun, pick_shard_fun()}
| {pick_node_fun, pick_node_fun()}
| {restart_strategy, one_for_one | one_for_all}.

%% @type option() = ets:type() | ets:access() | named_table
%% | {keypos, pos_integer()}
%% | {heir, pid(), HeirData :: term()}
%% | {heir, none} | tweaks()
%% | shards_opt().
%%
%% Create table options – used by `new/2'.
-type option() :: ets:type() | ets:access() | named_table
| {keypos, pos_integer()}
| {heir, pid(), HeirData :: term()}
| {heir, none} | tweaks()
| shards_opt().

%% Exported types
-export_type([
operation_t/0,
Expand Down Expand Up @@ -309,7 +311,7 @@ file2tab(Filenames) ->
Options :: [Option],
Option :: {verify, boolean()},
Reason :: term(),
Response :: [{Tab, state()} | {error, Reason}].
Response :: {ok, Tab} | {error, Reason}.
file2tab(Filenames, Options) ->
try
ShardTabs = [{First, _} | _] = [begin
Expand All @@ -322,7 +324,11 @@ file2tab(Filenames, Options) ->
end
end || FN <- Filenames],
Tab = name_from_shard(First),
new(Tab, [{restore, ShardTabs, Options}, {n_shards, length(Filenames)}])
{Tab, _} = new(Tab, [
{restore, ShardTabs, Options},
{n_shards, length(Filenames)}
]),
{ok, Tab}
catch
_:Error -> Error
end.
Expand Down
Loading

0 comments on commit 233c7eb

Please sign in to comment.