Skip to content

Commit

Permalink
Merge pull request #1 from sokal32/master
Browse files Browse the repository at this point in the history
added props passing (publish)
  • Loading branch information
5HT committed Apr 29, 2015
2 parents 80f976a + 9765622 commit c8693ab
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
17 changes: 9 additions & 8 deletions src/mqs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ stop(_State) ->
true -> ok end,
ok.

subscribe(Name, Args) -> gen_server:call(?SERVER, {sub, Name, Args}, infinity).
publish(Name, Message) -> gen_server:call(?SERVER, {pub, Name, Message}, infinity).
close(Name) -> gen_server:call(?SERVER, {unsub, Name}, infinity).
suspend(Name) -> gen_server:call(?SERVER, {suspend, Name}, infinity).
resume(Name) -> gen_server:call(?SERVER, {resume, Name}, infinity).

shutdown() -> gen_server:call(?SERVER, stop, infinity).
terminated(Pid) -> gen_server:cast(?SERVER, {terminated, Pid}).
subscribe(Name, Args) -> gen_server:call(?SERVER, {sub, Name, Args}, infinity).
publish(Name, Message) -> gen_server:call(?SERVER, {pub, Name, Message}, infinity).
publish(Name, Message, Props) -> gen_server:call(?SERVER, {pub, Name, Message, Props}, infinity).
close(Name) -> gen_server:call(?SERVER, {unsub, Name}, infinity).
suspend(Name) -> gen_server:call(?SERVER, {suspend, Name}, infinity).
resume(Name) -> gen_server:call(?SERVER, {resume, Name}, infinity).

shutdown() -> gen_server:call(?SERVER, stop, infinity).
terminated(Pid) -> gen_server:cast(?SERVER, {terminated, Pid}).

log_modules() -> [mqs_worker,mqs,mqs_manager].
-define(ALLOWED, (application:get_env(mqs,log_modules,mqs))).
Expand Down
6 changes: 5 additions & 1 deletion src/mqs_manager.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(mqs_manager).
-author('Max Davidenko').
-behaviour(gen_server).
-include("mqs.hrl").
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-compile(export_all).
Expand All @@ -27,13 +28,16 @@ handle_call({sub, Name, Args}, _From, State) ->
{reply, Reply, NewState};

handle_call({pub, Name, Message}, _From, State) ->
handle_call({pub, Name, Message, #'P_basic'{}}, _From, State);

handle_call({pub, Name, Message, Props = #'P_basic'{}}, _From, State) ->
HandlerPid = dict:find(Name, State#state.messages_handlers),
Reply = case HandlerPid of
error ->
mqs:error(?MODULE,"Unable to send message. Handler with specified name not found. Name - ~p", [Name]),
{error, cant_send_message};
{ok, {Pid, _}} ->
gen_server:call(Pid, {pub, term_to_binary(Message)})
gen_server:call(Pid, {pub, term_to_binary(Message), Props})
end,
{reply, Reply, State};

Expand Down
7 changes: 4 additions & 3 deletions src/mqs_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ init(Args) ->
_ -> {ok, State} end;
_ -> RouteCreated end.

handle_call({pub, Message}, _From, State) ->
handle_call({pub, Message, Props = #'P_basic'{}}, _From, State) ->
Channel = State#state.channel,
MsgPublish = #'basic.publish'{exchange=State#state.args#mqs.name,routing_key=State#state.args#mqs.key},
amqp_channel:cast(Channel, MsgPublish, #amqp_msg{payload = Message}),
amqp_channel:cast(Channel, MsgPublish, #amqp_msg{payload = Message, props = Props}),
Reply = {ok, "Message successfuly sent"},
{reply, Reply, State};

Expand Down Expand Up @@ -88,10 +88,11 @@ handle_info(rejecting, State) ->

handle_info({#'basic.deliver'{delivery_tag = Tag}, Message}, State) ->
Payload = Message#amqp_msg.payload,
Props = Message#amqp_msg.props,
Spec = State#state.args,
Mod = Spec#mqs.'mod',
Fun = Spec#mqs.'fun',
InvokeResult = try Mod:Fun({Payload,Spec#mqs.arg})
InvokeResult = try Mod:Fun({Payload,Spec#mqs.arg,Props})
catch _:_ -> {error, msg_callback_failed} end,
case InvokeResult of
{ok, _} -> amqp_channel:cast(State#state.channel,#'basic.ack'{delivery_tag=Tag});
Expand Down

0 comments on commit c8693ab

Please sign in to comment.