Skip to content

Commit

Permalink
Merge pull request #5892 from cloudamqp/shovel_fixes
Browse files Browse the repository at this point in the history
Tiny shovel-related fixes
  • Loading branch information
michaelklishin authored Oct 8, 2022
2 parents 567d32a + b9f7dc0 commit 7d1085e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 21 deletions.
3 changes: 3 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,16 @@ nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
Mod:nack(Tag, Multi, State).

%% Common functions

%% Count down until we stop publishing in on-confirm mode
decr_remaining_unacked(State = #{source := #{remaining_unacked := unlimited}}) ->
State;
decr_remaining_unacked(State = #{source := #{remaining_unacked := 0}}) ->
State;
decr_remaining_unacked(State = #{source := #{remaining_unacked := N} = Src}) ->
State#{source => Src#{remaining_unacked => N - 1}}.

%% Count down until we shut down in all modes
decr_remaining(_N, State = #{source := #{remaining := unlimited}}) ->
State;
decr_remaining(N, State = #{source := #{remaining := M} = Src,
Expand Down
11 changes: 3 additions & 8 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
%% for testing purposes
-export([get_connection_name/1]).

-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_shovel.hrl").

-record(state, {inbound_conn, inbound_ch, outbound_conn, outbound_ch,
name, type, config, inbound_uri, outbound_uri, unacked,
remaining, %% [1]
remaining_unacked}). %% [2]

%% [1] Counts down until we shut down in all modes
%% [2] Counts down until we stop publishing in on-confirm mode
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
type :: static | dynamic,
config :: rabbit_shovel_behaviour:state()}).

start_link(Type, Name, Config) ->
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_shovel_management/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ define PROJECT_APP_EXTRA_KEYS
endef

DEPS = rabbit_common rabbit rabbitmq_management rabbitmq_shovel
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 meck

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
Expand Down
8 changes: 3 additions & 5 deletions deps/rabbitmq_shovel_management/priv/www/js/tmpl/shovels.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
<td><%= fmt_string(shovel.vhost, '') %></td>
<% } %>
<% if (shovel.state == 'terminated') { %>
<td colspan="5"><%= fmt_state('red', shovel.state) %></td>
<td><%= shovel.timestamp %></td>
</tr>
<tr>
<td colspan="<%= 8 + extra_width %>">
<td><%= fmt_state('red', shovel.state) %></td>
<td colspan="6">
<pre><%= fmt_string(shovel.reason) %></pre>
</td>
<td><%= shovel.timestamp %></td>
<% } else { %>
<td><%= fmt_state('green', shovel.state) %></td>
<td><%= fmt_string(shovel.src_protocol) %></td>
Expand Down
18 changes: 14 additions & 4 deletions deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,27 @@ delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
case is_restart(ReqData) of
true ->
rabbit_log:info("Asked to restart shovel '~s' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
case rpc:call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name], ?SHOVEL_CALLS_TIMEOUT_MS) of
try erpc:call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> true;
{_, Msg} -> rabbit_log:error(Msg),
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false
catch _:Reason ->
rabbit_log:error("Failed to restart shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
false
end;

_ ->
rabbit_log:info("Asked to delete shovel '~s' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
case rpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
try erpc:call(Node, rabbit_shovel_util, delete_shovel, [VHost, Name, Username], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> true;
{_, Msg} -> rabbit_log:error(Msg),
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false
catch _:Reason ->
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
false
end

Expand Down
36 changes: 33 additions & 3 deletions deps/rabbitmq_shovel_management/test/rabbit_shovel_mgmt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-define(MOCK_SHOVELS,
[[
{node,'node1'},
{node,node()},
{name,<<"shovel1">>},
{vhost,<<"/">>},
{type,dynamic},
Expand Down Expand Up @@ -39,14 +39,29 @@ all() ->
[
get_shovel_node_shovel_different_name,
get_shovel_node_shovel_different_vhost_name,
get_shovel_node_shovel_found
get_shovel_node_shovel_found,
delete_resource_badrpc
].

init_per_testcase(delete_resource_badrpc, _Config) ->
meck:expect(rabbit_shovel_mgmt_util, status, fun(_,_) -> ?MOCK_SHOVELS end),
meck:expect(rabbit_shovel_status, lookup,
fun({_, Name}) ->
case [S || S <- ?MOCK_SHOVELS, proplists:get_value(name, S) =:= Name] of
[Obj] -> Obj;
[] -> not_found
end
end),
_Config;
init_per_testcase(_, _Config) ->
meck:new(rabbit_shovel_mgmt_util),
meck:expect(rabbit_shovel_mgmt_util, status, fun(_,_) -> ?MOCK_SHOVELS end),
_Config.

end_per_testcase(delete_resource_badrpc, _Config) ->
meck:unload(rabbit_shovel_mgmt_util),
meck:unload(rabbit_shovel_status),
_Config;
end_per_testcase(_, _Config) ->
meck:unload(rabbit_shovel_mgmt_util),
_Config.
Expand All @@ -70,4 +85,19 @@ get_shovel_node_shovel_found(_Config) ->
Name= <<"shovel2">>,
User = #user{username="admin",tags = [administrator]},
Node = rabbit_shovel_mgmt:get_shovel_node(VHost, Name, {}, #context{user = User}),
?assertEqual('node2', Node).
?assertEqual('node2', Node).

delete_resource_badrpc(_Config) ->
VHost = <<"/">>,
Name= <<"shovel1">>,
User = #user{username="admin",tags = [administrator]},
Context = #context{user = User},
ReqData = #{path => <<"/shovels/vhost/././restart">>,
bindings => #{vhost => VHost, name => Name}},
{Reply, ReqData, Context} = rabbit_shovel_mgmt:delete_resource(ReqData, Context),
?assertEqual(false, Reply),

ReqData2 = #{path => <<"/shovels/vhost/./.">>,
bindings => #{vhost => VHost, name => Name}},
{Reply, ReqData2, Context} = rabbit_shovel_mgmt:delete_resource(ReqData2, Context),
?assertEqual(false, Reply).

0 comments on commit 7d1085e

Please sign in to comment.