diff --git a/include/mqs.hrl b/include/mqs.hrl index 02dfb1e..f2d3340 100644 --- a/include/mqs.hrl +++ b/include/mqs.hrl @@ -11,4 +11,5 @@ broker = #amqp_params_network{}, 'mod' = [], 'fun' = [], - 'arg' = null}). + 'arg' = null, + 'pid' = null}). diff --git a/src/mqs_manager.erl b/src/mqs_manager.erl index 9093a20..9857064 100644 --- a/src/mqs_manager.erl +++ b/src/mqs_manager.erl @@ -6,26 +6,32 @@ -define(SERVER, ?MODULE). -compile(export_all). --record(state, { messages_handlers = null, handlers_pids = null}). +-record(state, { messages_handlers = null, handlers_pids = null, subscribers = []}). init([]) -> {ok, #state{ messages_handlers = dict:new(), handlers_pids = dict:new() }}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). handle_call({sub, Name, Args}, _From, State) -> - StartResult = mqs_worker:start(Args), - {Reply,NewState} = case StartResult of - {ok, Pid} -> - mqs:info(?MODULE,"Messages handler instance successfuly started: ~p", [Name]), - { {ok, consumer_started}, - State#state { - messages_handlers = dict:store(Name, {Pid, Args}, State#state.messages_handlers), - handlers_pids = dict:store(Pid, Name, State#state.handlers_pids) } }; - _ -> - mqs:error(?MODULE,"Error while starting message handler instance: Name - ~p", [Name]), - { {error, cant_start_consumer}, - State } - end, - {reply, Reply, NewState}; + case lists:member({Name,Args}, State#state.subscribers) of + true -> + {reply, {ok, consumer_exists}, State}; + false -> + StartResult = mqs_worker:start(Args), + {Reply,NewState} = case StartResult of + {ok, Pid} -> + mqs:info(?MODULE,"Messages handler instance successfuly started: ~p", [Name]), + { {ok, consumer_started}, + State#state { + messages_handlers = dict:store(Name, {Pid, Args}, State#state.messages_handlers), + handlers_pids = dict:store(Pid, Name, State#state.handlers_pids), + subscribers = [{Name,Args}|State#state.subscribers] } }; + _ -> + mqs:error(?MODULE,"Error while starting message handler instance: Name - ~p", [Name]), + { {error, cant_start_consumer}, + State } + end, + {reply, Reply, NewState} + end; handle_call({pub, Name, Message}, _From, State) -> handle_call({pub, Name, Message, #'P_basic'{}}, _From, State); @@ -47,10 +53,11 @@ handle_call({unsub, Name}, _From, State) -> error -> mqs:error(?MODULE,"Unable to stop messages handler. Specified handler not found. Name - ~p", [Name]), { {error, consumer_not_found}, State }; - {ok, {Pid, _}} -> + {ok, {Pid, Args}} -> { gen_server:call(Pid, stop), State#state { messages_handlers = dict:erase(Name, State#state.messages_handlers), - handlers_pids = dict:erase(Pid, State#state.handlers_pids) } } + handlers_pids = dict:erase(Pid, State#state.handlers_pids), + subscribers = lists:delete({Name,Args}, State#state.subscribers) } } end, {reply, Reply, NewState};