Skip to content

Commit

Permalink
chore: rename option to message_delivery_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
HJianBo committed Jun 20, 2024
1 parent 43ba7d5 commit 8b08bcb
Showing 1 changed file with 41 additions and 26 deletions.
67 changes: 41 additions & 26 deletions src/emqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
-export([subscriptions/1]).

-export([info/1, stop/1]).
-export([stats/1]).

%% For test cases
-export([ pause/1
Expand Down Expand Up @@ -129,7 +130,7 @@
| {max_inflight, pos_integer()}
| {retry_interval, timeout()}
| {max_retry_count, non_neg_integer()}
| {message_expiry_interval, non_neg_integer()}
| {message_delivery_timeout, non_neg_integer()}
| {will_topic, iodata()}
| {will_payload, iodata()}
| {will_retain, boolean()}
Expand Down Expand Up @@ -444,6 +445,9 @@ subscriptions(Client) ->
info(Client) ->
gen_statem:call(Client, info).

stats(Client) ->
gen_statem:call(Client, stats).

stop(Client) ->
gen_statem:call(Client, stop).

Expand Down Expand Up @@ -600,8 +604,8 @@ init([{retry_interval, I} | Opts], State) ->
init([{max_retry_count, N} | Opts], State) when is_integer(N) ->
put_max_retry_count(N),
init(Opts, State);
init([{message_expiry_interval, T} | Opts], State) when is_integer(T) ->
put_message_expiry_interval(timer:seconds(T)),
init([{message_delivery_timeout, T} | Opts], State) when is_integer(T) ->
put_message_delivery_timeout(timer:seconds(T)),
init(Opts, State);
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
init(Opts, State#state{bridge_mode = Mode});
Expand Down Expand Up @@ -785,7 +789,7 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
Now = now_ts(),
Meta = #{first_sent_at => Now, last_sent_at => Now, retry_count => 0},
Inflight1 = maps:put(PacketId, {publish, Msg1, Meta}, Inflight),
ensure_expiry_timer(),
ensure_delivery_timer(),
State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
Actions = [{reply, From, {ok, PacketId}}],
case is_inflight_full(State1) of
Expand Down Expand Up @@ -966,14 +970,14 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
false -> retry_send(State#state{retry_timer = undefined})
end;

connected(info, {timeout, _TRef, expiry}, State = #state{inflight = Inflight}) ->
clean_expiry_timer(),
connected(info, {timeout, _TRef, delivery_timeout}, State = #state{inflight = Inflight}) ->
clean_delivery_timer(),
case maps:size(Inflight) == 0 of
true ->
{keep_state, State};
false ->
State1 = expiry_inflight_msgs(now_ts(), State),
ensure_expiry_timer(),
ensure_delivery_timer(),
{keep_state, State1}
end;

Expand All @@ -994,6 +998,9 @@ inflight_full(EventType, EventContent, Data) ->
handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};

handle_event({call, From}, stats, _StateName, _State) ->
{keep_state_and_data, [{reply, From, collect_stats()}]};

handle_event(info, {gun_ws, ConnPid, _StreamRef, {binary, Data}},
_StateName, State = #state{socket = ConnPid}) ->
?LOG(debug, "RECV Data: ~p", [Data], State),
Expand Down Expand Up @@ -1132,7 +1139,7 @@ delete_inflight_when_full(Packet, State) ->
expiry_inflight_msgs(Now, State = #state{inflight = Inflight}) ->
Fun = fun(PacketId, {Type, Msg, Meta}, Acc) ->
SentAt = maps:get(first_sent_at, Meta),
ExpiryTime = message_expiry_interval({Type, Msg, Meta}),
ExpiryTime = get_message_delivery_timeout(),
case Now - SentAt of
Diff when Diff >= ExpiryTime ->
Acc#{PacketId => {Type, Msg, Meta}};
Expand All @@ -1152,11 +1159,6 @@ expiry_inflight_msgs(Now, State = #state{inflight = Inflight}) ->
delete_expired_inflight_msgs(ExpiredMsgs1, State)
end.

message_expiry_interval({publish, #mqtt_msg{props = Props}, _Meta}) when is_map(Props) ->
maps:get('Message-Expiry-Interval', Props, get_message_expiry_interval());
message_expiry_interval(_) ->
get_message_expiry_interval().

delete_expired_inflight_msgs([], State) ->
State;
delete_expired_inflight_msgs(
Expand All @@ -1175,6 +1177,7 @@ delete_expired_inflight_msgs(
reason_code => 16#FF,
reason_code_name => expired,
properties => #{}}),
inc_delivery_timeout(),
State1 = State#state{inflight = maps:remove(PacketId, Inflight)},
delete_expired_inflight_msgs(Msgs, State1).

Expand Down Expand Up @@ -1506,7 +1509,7 @@ reason_code_name(_Code) -> unknown_error.
%% Added for 1.2.3.4
%%
%% Due to compatibility issues, we can't directly modify the structure of the state to store
%% these newly added configurations. So we put `max_retry_count` and `message_expiry_interval`
%% these newly added configurations. So we put `max_retry_count` and `message_delivery_timeout`
%% into the process dictionary temporarily.
put_max_retry_count(N) when is_integer(N) ->
erlang:put(max_retry_count, N).
Expand All @@ -1518,30 +1521,42 @@ get_max_retry_count() ->
Value -> Value
end.

put_message_expiry_interval(T) when is_integer(T) ->
erlang:put(message_expiry_interval, T).
put_message_delivery_timeout(T) when is_integer(T) ->
erlang:put(message_delivery_timeout, T).

get_message_expiry_interval() ->
get_message_delivery_timeout() ->
Default = timer:seconds(30),
case erlang:get(message_expiry_interval) of
case erlang:get(message_delivery_timeout) of
undefined -> Default;
Value -> Value
end.

ensure_expiry_timer() ->
Interval = get_message_expiry_interval() + 1000,
case erlang:get(expiry_timer) of
ensure_delivery_timer() ->
Interval = get_message_delivery_timeout() + 1000,
case erlang:get(delivery_timer) of
undefined ->
TRef = erlang:start_timer(Interval, self(), expiry),
erlang:put(expiry_timer, TRef);
TRef = erlang:start_timer(Interval, self(), delivery_timeout),
erlang:put(delivery_timer, TRef);
_TRef ->
ok
end.

clean_expiry_timer() ->
case erlang:get(expiry_timer) of
clean_delivery_timer() ->
case erlang:get(delivery_timer) of
undefined -> ok;
TRef ->
erlang:cancel_timer(TRef),
erlang:erase(expiry_timer)
erlang:erase(delivery_timer)
end.

collect_stats() ->
#{delivery_timeout => get_stats_delivery_timeout()}.

get_stats_delivery_timeout() ->
case erlang:get(delivery_timeout) of
undefined -> 0;
Value -> Value
end.

inc_delivery_timeout() ->
erlang:put(delivery_timeout, get_stats_delivery_timeout() + 1).

0 comments on commit 8b08bcb

Please sign in to comment.