Skip to content

Commit

Permalink
Merge pull request #2 from sokal32/master
Browse files Browse the repository at this point in the history
added subscriber uniqueness check
  • Loading branch information
5HT committed Apr 29, 2015
2 parents c8693ab + 82e2bcb commit 8f7ca68
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
3 changes: 2 additions & 1 deletion include/mqs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
broker = #amqp_params_network{},
'mod' = [],
'fun' = [],
'arg' = null}).
'arg' = null,
'pid' = null}).
41 changes: 24 additions & 17 deletions src/mqs_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@
-define(SERVER, ?MODULE).
-compile(export_all).

-record(state, { messages_handlers = null, handlers_pids = null}).
-record(state, { messages_handlers = null, handlers_pids = null, subscribers = []}).

init([]) -> {ok, #state{ messages_handlers = dict:new(), handlers_pids = dict:new() }}.
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

handle_call({sub, Name, Args}, _From, State) ->
StartResult = mqs_worker:start(Args),
{Reply,NewState} = case StartResult of
{ok, Pid} ->
mqs:info(?MODULE,"Messages handler instance successfuly started: ~p", [Name]),
{ {ok, consumer_started},
State#state {
messages_handlers = dict:store(Name, {Pid, Args}, State#state.messages_handlers),
handlers_pids = dict:store(Pid, Name, State#state.handlers_pids) } };
_ ->
mqs:error(?MODULE,"Error while starting message handler instance: Name - ~p", [Name]),
{ {error, cant_start_consumer},
State }
end,
{reply, Reply, NewState};
case lists:member({Name,Args}, State#state.subscribers) of
true ->
{reply, {ok, consumer_exists}, State};
false ->
StartResult = mqs_worker:start(Args),
{Reply,NewState} = case StartResult of
{ok, Pid} ->
mqs:info(?MODULE,"Messages handler instance successfuly started: ~p", [Name]),
{ {ok, consumer_started},
State#state {
messages_handlers = dict:store(Name, {Pid, Args}, State#state.messages_handlers),
handlers_pids = dict:store(Pid, Name, State#state.handlers_pids),
subscribers = [{Name,Args}|State#state.subscribers] } };
_ ->
mqs:error(?MODULE,"Error while starting message handler instance: Name - ~p", [Name]),
{ {error, cant_start_consumer},
State }
end,
{reply, Reply, NewState}
end;

handle_call({pub, Name, Message}, _From, State) ->
handle_call({pub, Name, Message, #'P_basic'{}}, _From, State);
Expand All @@ -47,10 +53,11 @@ handle_call({unsub, Name}, _From, State) ->
error ->
mqs:error(?MODULE,"Unable to stop messages handler. Specified handler not found. Name - ~p", [Name]),
{ {error, consumer_not_found}, State };
{ok, {Pid, _}} ->
{ok, {Pid, Args}} ->
{ gen_server:call(Pid, stop),
State#state { messages_handlers = dict:erase(Name, State#state.messages_handlers),
handlers_pids = dict:erase(Pid, State#state.handlers_pids) } }
handlers_pids = dict:erase(Pid, State#state.handlers_pids),
subscribers = lists:delete({Name,Args}, State#state.subscribers) } }
end,
{reply, Reply, NewState};

Expand Down

0 comments on commit 8f7ca68

Please sign in to comment.