Skip to content

Commit

Permalink
Merge pull request #6467 from rabbitmq/cmq-optimisations
Browse files Browse the repository at this point in the history
Classic Queue Mirroring: reduce memory usage during dead-lettering of many messages
  • Loading branch information
michaelklishin authored Nov 25, 2022
2 parents f660292 + 72373d2 commit e163ab4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 37 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ group_by_queue_and_reason(Tables) ->
ensure_xdeath_event_count(Augmented, N),
Key, SeenKeys, Acc),
{sets:add_element(Key, SeenKeys), Acc1}
end, {sets:new(), []}, Tables),
end, {sets:new([{version, 2}]), []}, Tables),
Grouped.

update_x_death_header(Info, undefined) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
backing_queue_state = BQS,
seen_status = #{},
confirmed = [],
known_senders = sets:new(),
known_senders = sets:new([{version, 2}]),
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)};
{error, Reason} ->
%% The GM can shutdown before the coordinator has started up
Expand Down
44 changes: 17 additions & 27 deletions deps/rabbit/src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
-export([set_maximum_since_use/2, info/1, go/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
code_change/3, handle_pre_hibernate/1, format_message_queue/2]).

-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2,
prioritise_cast/3, prioritise_info/3]).

-behaviour(gen_server2).
-behaviour(gm).
Expand Down Expand Up @@ -66,6 +66,19 @@
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).


prioritise_cast(Msg, _Len, _State) ->
case Msg of
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.

prioritise_info(Msg, _Len, _State) ->
case Msg of
sync_timeout -> 6;
_ -> 0
end.

info(QPid) -> gen_server2:call(QPid, info, infinity).

init(Q) when ?is_amqqueue(Q) ->
Expand Down Expand Up @@ -446,29 +459,6 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
{hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.

prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{gm_deaths, _Dead} -> 5;
_ -> 0
end.

prioritise_cast(Msg, _Len, _State) ->
case Msg of
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
_ -> 0
end.

prioritise_info(Msg, _Len, _State) ->
case Msg of
update_ram_duration -> 8;
sync_timeout -> 6;
_ -> 0
end.

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

%% ---------------------------------------------------------------------------
Expand Down Expand Up @@ -878,7 +868,7 @@ maybe_enqueue_message(

get_sender_queue(ChPid, SQ) ->
case maps:find(ChPid, SQ) of
error -> {queue:new(), sets:new(), running};
error -> {queue:new(), sets:new([{version, 2}]), running};
{ok, Val} -> Val
end.

Expand Down
12 changes: 4 additions & 8 deletions deps/rabbit_common/src/gen_server2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1326,10 +1326,8 @@ find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
true -> case Arity of
3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
state = State }) ->
Length = priority_queue:len(Queue),
case catch Mod:Fun(Msg, Length, State) of
3 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
case catch Mod:Fun(Msg, 0, State) of
drop ->
drop;
Res when is_integer(Res) ->
Expand All @@ -1338,10 +1336,8 @@ function_exported_or_default(Mod, Fun, Arity, Default) ->
handle_common_termination(Err, Msg, GS2State)
end
end;
4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
state = State }) ->
Length = priority_queue:len(Queue),
case catch Mod:Fun(Msg, From, Length, State) of
4 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
case catch Mod:Fun(Msg, From, 0, State) of
Res when is_integer(Res) ->
Res;
Err ->
Expand Down

0 comments on commit e163ab4

Please sign in to comment.