From 8b08bcbdc071652d30c9b24dcb0bde71ec91630c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 20 Jun 2024 11:28:09 +0800 Subject: [PATCH] chore: rename option to message_delivery_timeout --- src/emqtt.erl | 67 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/src/emqtt.erl b/src/emqtt.erl index f20f411..dad153a 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -63,6 +63,7 @@ -export([subscriptions/1]). -export([info/1, stop/1]). +-export([stats/1]). %% For test cases -export([ pause/1 @@ -129,7 +130,7 @@ | {max_inflight, pos_integer()} | {retry_interval, timeout()} | {max_retry_count, non_neg_integer()} - | {message_expiry_interval, non_neg_integer()} + | {message_delivery_timeout, non_neg_integer()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} @@ -444,6 +445,9 @@ subscriptions(Client) -> info(Client) -> gen_statem:call(Client, info). +stats(Client) -> + gen_statem:call(Client, stats). + stop(Client) -> gen_statem:call(Client, stop). @@ -600,8 +604,8 @@ init([{retry_interval, I} | Opts], State) -> init([{max_retry_count, N} | Opts], State) when is_integer(N) -> put_max_retry_count(N), init(Opts, State); -init([{message_expiry_interval, T} | Opts], State) when is_integer(T) -> - put_message_expiry_interval(timer:seconds(T)), +init([{message_delivery_timeout, T} | Opts], State) when is_integer(T) -> + put_message_delivery_timeout(timer:seconds(T)), init(Opts, State); init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init(Opts, State#state{bridge_mode = Mode}); @@ -785,7 +789,7 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, Now = now_ts(), Meta = #{first_sent_at => Now, last_sent_at => Now, retry_count => 0}, Inflight1 = maps:put(PacketId, {publish, Msg1, Meta}, Inflight), - ensure_expiry_timer(), + ensure_delivery_timer(), State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}), Actions = [{reply, From, {ok, PacketId}}], case is_inflight_full(State1) of @@ -966,14 +970,14 @@ connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, false -> retry_send(State#state{retry_timer = undefined}) end; -connected(info, {timeout, _TRef, expiry}, State = #state{inflight = Inflight}) -> - clean_expiry_timer(), +connected(info, {timeout, _TRef, delivery_timeout}, State = #state{inflight = Inflight}) -> + clean_delivery_timer(), case maps:size(Inflight) == 0 of true -> {keep_state, State}; false -> State1 = expiry_inflight_msgs(now_ts(), State), - ensure_expiry_timer(), + ensure_delivery_timer(), {keep_state, State1} end; @@ -994,6 +998,9 @@ inflight_full(EventType, EventContent, Data) -> handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; +handle_event({call, From}, stats, _StateName, _State) -> + {keep_state_and_data, [{reply, From, collect_stats()}]}; + handle_event(info, {gun_ws, ConnPid, _StreamRef, {binary, Data}}, _StateName, State = #state{socket = ConnPid}) -> ?LOG(debug, "RECV Data: ~p", [Data], State), @@ -1132,7 +1139,7 @@ delete_inflight_when_full(Packet, State) -> expiry_inflight_msgs(Now, State = #state{inflight = Inflight}) -> Fun = fun(PacketId, {Type, Msg, Meta}, Acc) -> SentAt = maps:get(first_sent_at, Meta), - ExpiryTime = message_expiry_interval({Type, Msg, Meta}), + ExpiryTime = get_message_delivery_timeout(), case Now - SentAt of Diff when Diff >= ExpiryTime -> Acc#{PacketId => {Type, Msg, Meta}}; @@ -1152,11 +1159,6 @@ expiry_inflight_msgs(Now, State = #state{inflight = Inflight}) -> delete_expired_inflight_msgs(ExpiredMsgs1, State) end. -message_expiry_interval({publish, #mqtt_msg{props = Props}, _Meta}) when is_map(Props) -> - maps:get('Message-Expiry-Interval', Props, get_message_expiry_interval()); -message_expiry_interval(_) -> - get_message_expiry_interval(). - delete_expired_inflight_msgs([], State) -> State; delete_expired_inflight_msgs( @@ -1175,6 +1177,7 @@ delete_expired_inflight_msgs( reason_code => 16#FF, reason_code_name => expired, properties => #{}}), + inc_delivery_timeout(), State1 = State#state{inflight = maps:remove(PacketId, Inflight)}, delete_expired_inflight_msgs(Msgs, State1). @@ -1506,7 +1509,7 @@ reason_code_name(_Code) -> unknown_error. %% Added for 1.2.3.4 %% %% Due to compatibility issues, we can't directly modify the structure of the state to store -%% these newly added configurations. So we put `max_retry_count` and `message_expiry_interval` +%% these newly added configurations. So we put `max_retry_count` and `message_delivery_timeout` %% into the process dictionary temporarily. put_max_retry_count(N) when is_integer(N) -> erlang:put(max_retry_count, N). @@ -1518,30 +1521,42 @@ get_max_retry_count() -> Value -> Value end. -put_message_expiry_interval(T) when is_integer(T) -> - erlang:put(message_expiry_interval, T). +put_message_delivery_timeout(T) when is_integer(T) -> + erlang:put(message_delivery_timeout, T). -get_message_expiry_interval() -> +get_message_delivery_timeout() -> Default = timer:seconds(30), - case erlang:get(message_expiry_interval) of + case erlang:get(message_delivery_timeout) of undefined -> Default; Value -> Value end. -ensure_expiry_timer() -> - Interval = get_message_expiry_interval() + 1000, - case erlang:get(expiry_timer) of +ensure_delivery_timer() -> + Interval = get_message_delivery_timeout() + 1000, + case erlang:get(delivery_timer) of undefined -> - TRef = erlang:start_timer(Interval, self(), expiry), - erlang:put(expiry_timer, TRef); + TRef = erlang:start_timer(Interval, self(), delivery_timeout), + erlang:put(delivery_timer, TRef); _TRef -> ok end. -clean_expiry_timer() -> - case erlang:get(expiry_timer) of +clean_delivery_timer() -> + case erlang:get(delivery_timer) of undefined -> ok; TRef -> erlang:cancel_timer(TRef), - erlang:erase(expiry_timer) + erlang:erase(delivery_timer) end. + +collect_stats() -> + #{delivery_timeout => get_stats_delivery_timeout()}. + +get_stats_delivery_timeout() -> + case erlang:get(delivery_timeout) of + undefined -> 0; + Value -> Value + end. + +inc_delivery_timeout() -> + erlang:put(delivery_timeout, get_stats_delivery_timeout() + 1).