diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index e552c238..587a7fba 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -48,8 +48,8 @@ jobs: - name: prepare run: | /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" - brew install curl zip unzip gnu-sed erlang@23 - echo "$(brew --prefix)/opt/erlang@${{ matrix.otp }}/bin" >> $GITHUB_PATH + brew install curl zip unzip gnu-sed erlang@24 + echo "$(brew --prefix)/opt/erlang@24/bin" >> $GITHUB_PATH echo "$(brew --prefix)/opt/unzip/bin" >> $GITHUB_PATH echo "$(brew --prefix)/bin" >> $GITHUB_PATH - name: install rebar3 @@ -59,7 +59,7 @@ jobs: - name: build run: | echo "${PATH}" - export PATH="/usr/local/opt/erlang@23/bin:$PATH" + export PATH="/usr/local/opt/erlang@24/bin:$PATH" erl -noshell -eval 'io:format(erlang:system_info(otp_release)), halt(0).' .github/workflows/script/build.sh pkg=emqtt-macos-$(git describe --tags --always).zip diff --git a/src/emqtt.appup.src b/src/emqtt.appup.src index 6c47c5b5..8bb6bf80 100644 --- a/src/emqtt.appup.src +++ b/src/emqtt.appup.src @@ -1,6 +1,9 @@ %% -*-: erlang -*- -{"1.2.3.3", - [ {"1.2.3.2", [ +{"1.2.3.4", + [ {"1.2.3.3", [ + {load_module, emqtt, brutal_purge, soft_purge, []} + ]}, + {"1.2.3.2", [ {load_module, emqtt, brutal_purge, soft_purge, []} ]}, {"1.2.3.1", [ @@ -20,7 +23,10 @@ {load_module, emqtt_cli, brutal_purge, soft_purge, []} ]} ], - [{"1.2.3.2", [ + [{"1.2.3.3", [ + {load_module, emqtt, brutal_purge, soft_purge, []} + ]}, + {"1.2.3.2", [ {load_module, emqtt, brutal_purge, soft_purge, []} ]}, {"1.2.3.1", [ diff --git a/src/emqtt.erl b/src/emqtt.erl index 615bc4d4..a8c93d55 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -63,6 +63,7 @@ -export([subscriptions/1]). -export([info/1, stop/1]). +-export([stats/1]). %% For test cases -export([ pause/1 @@ -128,6 +129,8 @@ | {keepalive, non_neg_integer()} | {max_inflight, pos_integer()} | {retry_interval, timeout()} + | {max_retry_count, non_neg_integer()} + | {message_delivery_timeout, non_neg_integer()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} @@ -165,6 +168,11 @@ -type(client() :: pid() | atom()). +-type(msg_inflight_meta() :: #{ first_sent_at := pos_integer(), + last_sent_at := pos_integer(), + retry_count := non_neg_integer() + }). + -opaque(mqtt_msg() :: #mqtt_msg{}). -record(state, { @@ -195,7 +203,7 @@ pending_calls :: list(), subscriptions :: map(), max_inflight :: infinity | pos_integer(), - inflight :: #{packet_id() => term()}, + inflight :: #{packet_id() => {publish | pubrel, term(), msg_inflight_meta()}}, awaiting_rel :: map(), auto_ack :: boolean(), ack_timeout :: pos_integer(), @@ -219,11 +227,11 @@ -define(WILL_MSG(QoS, Retain, Topic, Props, Payload), #mqtt_msg{qos = QoS, - retain = Retain, - topic = Topic, - props = Props, - payload = Payload - }). + retain = Retain, + topic = Topic, + props = Props, + payload = Payload + }). -define(NO_CLIENT_ID, <<>>). @@ -437,6 +445,9 @@ subscriptions(Client) -> info(Client) -> gen_statem:call(Client, info). +stats(Client) -> + gen_statem:call(Client, stats). + stop(Client) -> gen_statem:call(Client, stop). @@ -590,6 +601,12 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> init(Opts, State#state{auto_ack = AutoAck}); init([{retry_interval, I} | Opts], State) -> init(Opts, State#state{retry_interval = timer:seconds(I)}); +init([{max_retry_count, N} | Opts], State) when is_integer(N) -> + put_max_retry_count(N), + init(Opts, State); +init([{message_delivery_timeout, T} | Opts], State) when is_integer(T) -> + put_message_delivery_timeout(timer:seconds(T)), + init(Opts, State); init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init(Opts, State#state{bridge_mode = Mode}); init([_Opt | Opts], State) -> @@ -609,7 +626,7 @@ init_will_msg({qos, QoS}, WillMsg) -> init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), ParseState = emqtt_frame:initial_parse_state( - #{max_size => MaxSize, version => Ver}), + #{max_size => MaxSize, version => Ver}), State#state{parse_state = ParseState}. merge_opts(Defaults, Options) -> @@ -763,12 +780,17 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> end; connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, - State = #state{inflight = Inflight, last_packet_id = PacketId}) + State = #state{inflight = Inflight0, last_packet_id = PacketId}) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> + Inflight = maybe_upgrade_inflight_data_structure(Inflight0), Msg1 = Msg#mqtt_msg{packet_id = PacketId}, case send(Msg1, State) of {ok, NewState} -> - Inflight1 = maps:put(PacketId, {publish, Msg1, os:timestamp()}, Inflight), + Now = now_ts(), + Meta = #{first_sent_at => Now, last_sent_at => Now, retry_count => 0}, + Inflight1 = maps:put(PacketId, {publish, Msg1, Meta}, Inflight), + ensure_delivery_timer(), + set_inflight_cnt(maps:size(Inflight1)), State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}), Actions = [{reply, From, {ok, PacketId}}], case is_inflight_full(State1) of @@ -833,18 +855,24 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> {keep_state, delete_inflight(PubAck, State)}; -connected(cast, ?PUBREC_PACKET(PacketId, _ReasonCode), State = #state{inflight = Inflight}) -> - NState = case maps:find(PacketId, Inflight) of - {ok, {publish, _Msg, _Ts}} -> - Inflight1 = maps:put(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), - State#state{inflight = Inflight1}; - {ok, {pubrel, _Ref, _Ts}} -> - ?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId], State), - State; - error -> - ?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId], State), - State - end, +connected(cast, ?PUBREC_PACKET(PacketId, _ReasonCode), State = #state{inflight = Inflight0}) -> + Inflight = maybe_upgrade_inflight_data_structure(Inflight0), + NState = case maps:find(PacketId, Inflight) of + {ok, {publish, _Msg, _Meta}} -> + Now = now_ts(), + Meta = #{first_sent_at => Now, + last_sent_at => Now, + retry_count => 0 + }, + Inflight1 = maps:put(PacketId, {pubrel, PacketId, Meta}, Inflight), + State#state{inflight = Inflight1}; + {ok, {pubrel, _Ref, _Ts}} -> + ?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId], State), + State; + error -> + ?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId], State), + State + end, send_puback(?PUBREL_PACKET(PacketId), NState); %%TODO::... if auto_ack is false, should we take PacketId from the map? @@ -936,10 +964,22 @@ connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef, {keep_state, ensure_ack_timer(NewState)}; connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, - inflight = Inflight}) -> - case maps:size(Inflight) == 0 of + inflight = Inflight0}) -> + Inflight = maybe_upgrade_inflight_data_structure(Inflight0), + case maps:size(Inflight) == 0 of true -> {keep_state, State#state{retry_timer = undefined}}; - false -> retry_send(State) + false -> retry_send(State#state{retry_timer = undefined}) + end; + +connected(info, {timeout, _TRef, delivery_timeout}, State = #state{inflight = Inflight}) -> + clean_delivery_timer(), + case maps:size(Inflight) == 0 of + true -> + {keep_state, State}; + false -> + State1 = expiry_inflight_msgs(now_ts(), State), + ensure_delivery_timer(), + {keep_state, State1} end; connected(EventType, EventContent, Data) -> @@ -959,6 +999,9 @@ inflight_full(EventType, EventContent, Data) -> handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; +handle_event({call, From}, stats, _StateName, _State) -> + {keep_state_and_data, [{reply, From, collect_stats()}]}; + handle_event(info, {gun_ws, ConnPid, _StreamRef, {binary, Data}}, _StateName, State = #state{socket = ConnPid}) -> ?LOG(debug, "RECV Data: ~p", [Data], State), @@ -976,8 +1019,7 @@ handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - ?LOG(error, "The connection error occured ~p, reason:~p", - [Error, Reason], State), + ?LOG(error, "Connection error ~p, reason:~p", [Error, Reason], State), {stop, {shutdown, Reason}, State}; handle_event(info, {Closed, _Sock}, _StateName, State) @@ -1036,6 +1078,27 @@ should_ping(ConnMod, Sock) -> Error end. +%% Since in 1.2.3.3 to 1.2.3.4 upgrade, we modified the data structure stored in the inflight. +%% We need to execute the following func to upgrade the data structure for the code hot upgrading. +maybe_upgrade_inflight_data_structure(Inflight) -> + Iter = maps:iterator(Inflight), + case maps:next(Iter) of + none -> + Inflight; + {_Key, {_Type, _Msg, Meta}, _Iter1} when is_map(Meta) -> + Inflight; + {_, {_, _, OsTimestamp}, _Iter1} when is_tuple(OsTimestamp) -> + maps:map( + fun(_Key, {Type, Msg, Ts}) -> + Ms = timestamp_to_ms(Ts), + Meta = #{first_sent_at => Ms, + last_sent_at => Ms, + retry_count => 0 + }, + {Type, Msg, Meta} + end, Iter) + end. + is_inflight_full(#state{max_inflight = infinity}) -> false; is_inflight_full(#state{max_inflight = MaxLimit, inflight = Inflight}) -> @@ -1043,12 +1106,14 @@ is_inflight_full(#state{max_inflight = MaxLimit, inflight = Inflight}) -> delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), State = #state{inflight = Inflight}) -> - case maps:find(PacketId, Inflight) of - {ok, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> + case maps:find(PacketId, Inflight) of + {ok, {publish, #mqtt_msg{packet_id = PacketId}, _Meta}} -> ok = eval_msg_handler(State, puback, #{packet_id => PacketId, reason_code => ReasonCode, properties => Properties}), - State#state{inflight = maps:remove(PacketId, Inflight)}; + Inflight1 = maps:remove(PacketId, Inflight), + set_inflight_cnt(maps:size(Inflight1)), + State#state{inflight = Inflight1}; error -> ?LOG(warning, "Unexpected PUBACK: ~p", [PacketId], State), State @@ -1056,11 +1121,13 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State = #state{inflight = Inflight}) -> case maps:find(PacketId, Inflight) of - {ok, {pubrel, _PacketId, _Ts}} -> + {ok, {pubrel, _PacketId, _Meta}} -> ok = eval_msg_handler(State, puback, #{packet_id => PacketId, reason_code => ReasonCode, properties => Properties}), - State#state{inflight = maps:remove(PacketId, Inflight)}; + Inflight1 = maps:remove(PacketId, Inflight), + set_inflight_cnt(maps:size(Inflight1)), + State#state{inflight = Inflight1}; error -> ?LOG(warning, "Unexpected PUBCOMP Packet: ~p", [PacketId], State), State @@ -1073,6 +1140,53 @@ delete_inflight_when_full(Packet, State) -> false -> {next_state, connected, State1} end. +expiry_inflight_msgs(Now, State = #state{inflight = Inflight}) -> + Fun = fun(PacketId, {Type, Msg, Meta}, Acc) -> + SentAt = maps:get(first_sent_at, Meta), + ExpiryTime = get_message_delivery_timeout(), + case Now - SentAt of + Diff when Diff >= ExpiryTime -> + Acc#{PacketId => {Type, Msg, Meta}}; + _ -> + Acc + end + end, + ExpiredMsgs = maps:fold(Fun, Inflight, #{}), + case maps:size(ExpiredMsgs) of + 0 -> State; + _ -> + ExpiredMsgs1 = lists:sort( + fun({_, {_, _, #{last_sent_at := Ts1}}}, {_, {_, _, #{last_sent_at := Ts2}}}) -> + Ts1 < Ts2 + end, maps:to_list(ExpiredMsgs) + ), + delete_expired_inflight_msgs(ExpiredMsgs1, State) + end. + +delete_expired_inflight_msgs([], State) -> + State; +delete_expired_inflight_msgs( + [{PacketId, {Type, Msg, _Meta}} | Msgs], + State = #state{inflight = Inflight} +) -> + case Type of + publish -> + #mqtt_msg{topic = Topic} = Msg, + ?LOG(info, "Message ~p expired, topic: ~ts", [PacketId, Topic], State); + pubrel -> + ?LOG(info, "Pubrel ~p expired", [PacketId], State) + end, + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + %% Treat 16#FF as internal error + reason_code => 16#FF, + reason_code_name => expired, + properties => #{}}), + inc_delivery_timeout(), + Inflight1 = maps:remove(PacketId, Inflight), + set_inflight_cnt(maps:size(Inflight1)), + State1 = State#state{inflight = Inflight1}, + delete_expired_inflight_msgs(Msgs, State1). + assign_id(?NO_CLIENT_ID, Props) -> case maps:find('Assigned-Client-Identifier', Props) of {ok, Value} -> @@ -1151,36 +1265,44 @@ do_ensure_retry_timer(_Interval, State) -> State. retry_send(State = #state{inflight = Inflight}) -> - SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, + SortFun = fun({_, _, #{last_sent_at := Ts1}}, {_, _, #{last_sent_at := Ts2}}) -> Ts1 < Ts2 end, Msgs = lists:sort(SortFun, maps:values(Inflight)), - retry_send(Msgs, os:timestamp(), State ). + retry_send(Msgs, now_ts(), State). retry_send([], _Now, State) -> {keep_state, ensure_retry_timer(State)}; -retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interval}) -> - Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms - case (Diff >= Interval) of - true -> case retry_send(Type, Msg, Now, State) of +retry_send([{Type, Msg, Meta} | Msgs], Now, State = #state{retry_interval = Interval}) -> + Ts = maps:get(last_sent_at, Meta), + Tried = maps:get(retry_count, Meta), + Diff = Now - Ts, + case {Tried >= get_max_retry_count(), (Diff >= Interval)} of + {true, _} -> + retry_send(Msgs, Now, State); + {false, true} -> case retry_send(Type, Msg, Meta, Now, State) of {ok, NewState} -> retry_send(Msgs, Now, NewState); {error, Error} -> {stop, Error} end; - false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)} + {false, false} -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)} end. retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, - Now, State = #state{inflight = Inflight}) -> + Meta, Now, State = #state{inflight = Inflight}) -> Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)}, case send(Msg1, State) of {ok, NewState} -> - Inflight1 = maps:put(PacketId, {publish, Msg1, Now}, Inflight), + RetryCount = maps:get(retry_count, Meta), + Meta1 = Meta#{last_sent_at => Now, retry_count => RetryCount + 1}, + Inflight1 = maps:put(PacketId, {publish, Msg1, Meta1}, Inflight), {ok, NewState#state{inflight = Inflight1}}; Error = {error, _Reason} -> Error end; -retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> +retry_send(pubrel, PacketId, Meta, Now, State = #state{inflight = Inflight}) -> case send(?PUBREL_PACKET(PacketId), State) of {ok, NewState} -> - Inflight1 = maps:put(PacketId, {pubrel, PacketId, Now}, Inflight), + RetryCount = maps:get(retry_count, Meta), + Meta1 = Meta#{last_sent_at => Now, retry_count => RetryCount + 1}, + Inflight1 = maps:put(PacketId, {pubrel, PacketId, Meta1}, Inflight), {ok, NewState#state{inflight = Inflight1}}; Error = {error, _Reason} -> Error @@ -1195,6 +1317,12 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, ok = eval_msg_handler(State, publish, Msg), State. +now_ts() -> + erlang:system_time(millisecond). + +timestamp_to_ms(Ts) -> + timer:now_diff(Ts, {0, 0, 0}) div 1000. + eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, owner = Owner}, disconnected, {ReasonCode, Properties}) when is_integer(ReasonCode) -> @@ -1380,3 +1508,72 @@ reason_code_name(16#A0) -> maximum_connect_time; reason_code_name(16#A1) -> subscription_identifiers_not_supported; reason_code_name(16#A2) -> wildcard_subscriptions_not_supported; reason_code_name(_Code) -> unknown_error. + +%%-------------------------------------------------------------------- +%% Process Dictionary + +%% Added for 1.2.3.4 +%% +%% Due to compatibility issues, we can't directly modify the structure of the state to store +%% these newly added configurations. So we put `max_retry_count` and `message_delivery_timeout` +%% into the process dictionary temporarily. +put_max_retry_count(N) when is_integer(N) -> + erlang:put(max_retry_count, N). + +get_max_retry_count() -> + Default = 3, + case erlang:get(max_retry_count) of + undefined -> Default; + Value -> Value + end. + +put_message_delivery_timeout(T) when is_integer(T) -> + erlang:put(message_delivery_timeout, T). + +get_message_delivery_timeout() -> + Default = timer:seconds(30), + case erlang:get(message_delivery_timeout) of + undefined -> Default; + Value -> Value + end. + +ensure_delivery_timer() -> + Interval = get_message_delivery_timeout() + 1000, + case erlang:get(delivery_timer) of + undefined -> + TRef = erlang:start_timer(Interval, self(), delivery_timeout), + erlang:put(delivery_timer, TRef); + _TRef -> + ok + end. + +clean_delivery_timer() -> + case erlang:get(delivery_timer) of + undefined -> ok; + TRef -> + erlang:cancel_timer(TRef), + erlang:erase(delivery_timer) + end. + +collect_stats() -> + #{delivery_timeout => get_stats_delivery_timeout(), + inflight_cnt => get_inflight_cnt() + }. + +get_stats_delivery_timeout() -> + case erlang:get(delivery_timeout) of + undefined -> 0; + Value -> Value + end. + +inc_delivery_timeout() -> + erlang:put(delivery_timeout, get_stats_delivery_timeout() + 1). + +set_inflight_cnt(Count) when is_integer(Count) -> + erlang:put(inflight_cnt, Count). + +get_inflight_cnt() -> + case erlang:get(inflight_cnt) of + undefined -> 0; + Value -> Value + end.