From 0cfa735a7e160d520bdcc326ecf8c708d9b74667 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 24 Aug 2022 10:31:29 +0800 Subject: [PATCH] refactor: safer return type for emqtt_inflight --- src/emqtt.erl | 70 ++++++++++++++++++++------------------- src/emqtt_inflight.erl | 74 +++++++++++++++++++++++++++--------------- test/emqtt_SUITE.erl | 2 +- 3 files changed, 84 insertions(+), 62 deletions(-) diff --git a/src/emqtt.erl b/src/emqtt.erl index 6c62fdb3..8ed70fa0 100644 --- a/src/emqtt.erl +++ b/src/emqtt.erl @@ -265,7 +265,6 @@ -type(sent_at() :: non_neg_integer()). %% in millisecond - -export_type([publish_success/0, publish_reply/0]). -define(ASYNC_PUB(Msg, ExpireAt, Callback), @@ -1098,12 +1097,10 @@ connected(info, ?ASYNC_PUB(Msg = #mqtt_msg{qos = ?QOS_0}, ExpireAt, Callback), S shoot(?PUB_REQ(Msg, ExpireAt, Callback), State0); connected(info, ?ASYNC_PUB(Msg = #mqtt_msg{qos = QoS}, ExpireAt, Callback), - State0 = #state{pendings = Pendings}) + State0 = #state{pendings = Pendings0}) when QoS == ?QOS_1; QoS == ?QOS_2 -> - maybe_shoot( - State0#state{ - pendings = enqueue_publish_req(?PUB_REQ(Msg, ExpireAt, Callback), Pendings) - }); + Pendings = enqueue_publish_req(?PUB_REQ(Msg, ExpireAt, Callback), Pendings0), + maybe_shoot(State0#state{pendings = Pendings}); connected(EventType, EventContent, Data) -> handle_event(EventType, EventContent, connected, Data). @@ -1335,11 +1332,11 @@ shoot(?PUB_REQ(Msg = #mqtt_msg{qos = QoS}, ExpireAt, Callback), Msg1 = Msg#mqtt_msg{packet_id = PacketId}, case send(Msg1, State) of {ok, NState} -> - Inflight1 = emqtt_inflight:insert( - PacketId, - ?INFLIGHT_PUBLISH(Msg1, now_ts(), ExpireAt, Callback), - Inflight - ), + {ok, Inflight1} = emqtt_inflight:insert( + PacketId, + ?INFLIGHT_PUBLISH(Msg1, now_ts(), ExpireAt, Callback), + Inflight + ), State1 = ensure_retry_timer(NState#state{inflight = Inflight1}), maybe_shoot(bump_last_packet_id(State1)); {error, Reason} -> @@ -1364,7 +1361,7 @@ ack_inflight( State = #state{inflight = Inflight} ) -> case emqtt_inflight:delete(PacketId, Inflight) of - {?INFLIGHT_PUBLISH(_Msg, _SentAt, _ExpireAt, Callback), NInflight} -> + {{value, ?INFLIGHT_PUBLISH(_Msg, _SentAt, _ExpireAt, Callback)}, NInflight} -> eval_callback_handler( {ok, #{packet_id => PacketId, reason_code => ReasonCode, @@ -1382,18 +1379,19 @@ ack_inflight( State = #state{inflight = Inflight} ) -> case emqtt_inflight:delete(PacketId, Inflight) of - {?INFLIGHT_PUBLISH(_Msg, _SentAt, ExpireAt, Callback), Inflight1} -> + {{value, ?INFLIGHT_PUBLISH(_Msg, _SentAt, ExpireAt, Callback)}, Inflight1} -> eval_callback_handler( {ok, #{packet_id => PacketId, reason_code => ReasonCode, reason_code_name => reason_code_name(ReasonCode), properties => Properties}}, Callback), - NInflight = emqtt_inflight:insert( - PacketId, - ?INFLIGHT_PUBREL(PacketId, now_ts(), ExpireAt), - Inflight1), + {ok, NInflight} = emqtt_inflight:insert( + PacketId, + ?INFLIGHT_PUBREL(PacketId, now_ts(), ExpireAt), + Inflight1 + ), State#state{inflight = NInflight}; - {?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt), _} -> + {{value, ?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt)}, _} -> ?LOG(notice, "duplicated_PUBREC_packet", #{packet_id => PacketId}, State), State; error -> @@ -1406,7 +1404,7 @@ ack_inflight( State = #state{inflight = Inflight} ) -> case emqtt_inflight:delete(PacketId, Inflight) of - {?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt), NInflight} -> + {{value, ?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt)}, NInflight} -> State#state{inflight = NInflight}; error -> ?LOG(warning, "unexpected_PUBCOMP", #{packet_id => PacketId}, State), @@ -1524,33 +1522,37 @@ retry_send(State = #state{retry_interval = Intv, inflight = Inflight}) -> try Now = now_ts(), Pred = fun(_, InflightReq) -> - (sent_at(InflightReq) + timer:seconds(Intv)) > Now end, - NInflight = lists:foldl( - fun({PacketId, InflightReq}, InflightAcc) -> - NReq = retry_send(Now, InflightReq, State), - emqtt_inflight:update(PacketId, NReq, InflightAcc) - end, Inflight, emqtt_inflight:retry(Pred, Inflight) - ), - {keep_state, ensure_retry_timer(State#state{inflight = NInflight})} + (sent_at(InflightReq) + timer:seconds(Intv)) > Now + end, + NState = retry_send(Now, emqtt_inflight:to_retry_list(Pred, Inflight), State), + {keep_state, ensure_retry_timer(NState)} catch error : Reason -> shutdown(Reason, State) end. -retry_send(Now, ?INFLIGHT_PUBLISH(Msg, _SentAt, _ExpireAt, _Callback), State) -> +retry_send(Now, [{PacketId, ?INFLIGHT_PUBLISH(Msg, _, ExpireAt, Callback)} | More], + State = #state{inflight = Inflight}) -> Msg1 = Msg#mqtt_msg{dup = true}, case send(Msg1, State) of - {ok, _NewState} -> - ?INFLIGHT_PUBLISH(Msg1, Now, _ExpireAt, _Callback); + {ok, NState} -> + NInflightReq = ?INFLIGHT_PUBLISH(Msg1, Now, ExpireAt, Callback), + {ok, NInflight} = emqtt_inflight:update(PacketId, NInflightReq, Inflight), + retry_send(Now, More, NState#state{inflight = NInflight}); {error, Reason} -> error(Reason) end; -retry_send(Now, ?INFLIGHT_PUBREL(PacketId, _SentAt, _ExpireAt), State) -> +retry_send(Now, [{PacketId, ?INFLIGHT_PUBREL(PacketId, _, ExpireAt)} | More], + State = #state{inflight = Inflight}) -> case send(?PUBREL_PACKET(PacketId), State) of - {ok, _NewState} -> - ?INFLIGHT_PUBREL(PacketId, Now, _ExpireAt); + {ok, NState} -> + NInflightReq = ?INFLIGHT_PUBREL(PacketId, Now, ExpireAt), + {ok, NInflight} = emqtt_inflight:update(PacketId, NInflightReq, Inflight), + retry_send(Now, More, NState#state{inflight = NInflight}); {error, Reason} -> error(Reason) - end. + end; +retry_send(_Now, [], State) -> + State. deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, diff --git a/src/emqtt_inflight.erl b/src/emqtt_inflight.erl index d0af2276..29fef7f9 100644 --- a/src/emqtt_inflight.erl +++ b/src/emqtt_inflight.erl @@ -30,7 +30,7 @@ , is_full/1 , is_empty/1 , foreach/2 - , retry/2 + , to_retry_list/2 ]). -type(inflight() :: inflight(req())). @@ -48,7 +48,7 @@ -type(req() :: term()). --export_type([inflight/1]). +-export_type([inflight/1, inflight/0]). %%-------------------------------------------------------------------- %% APIs @@ -57,29 +57,30 @@ new(MaxInflight) -> #{max_inflight => MaxInflight, sent => #{}, seq => 1}. --spec(insert(id(), req(), inflight()) -> error | inflight()). +-spec(insert(id(), req(), inflight()) -> error | {ok, inflight()}). insert(Id, Req, Inflight = #{max_inflight := Max, sent := Sent, seq := Seq}) -> case maps:size(Sent) >= Max of true -> error; false -> - Inflight#{sent := maps:put(Id, {Seq, Req}, Sent), seq := Seq + 1} + {ok, Inflight#{sent := maps:put(Id, {Seq, Req}, Sent), + seq := Seq + 1}} end. --spec(update(id(), req(), inflight()) -> inflight()). +-spec(update(id(), req(), inflight()) -> error | {ok, inflight()}). update(Id, Req, Inflight = #{sent := Sent}) -> case maps:find(Id, Sent) of error -> error; {ok, {No, _OldReq}} -> - Inflight#{sent := maps:put(Id, {No, Req}, Sent)} + {ok, Inflight#{sent := maps:put(Id, {No, Req}, Sent)}} end. --spec(delete(id(), inflight()) -> error | {req(), inflight()}). +-spec(delete(id(), inflight()) -> error | {{value, req()}, inflight()}). delete(Id, Inflight = #{sent := Sent}) -> case maps:take(Id, Sent) of error -> error; {{_, Req}, Sent1} -> - {Req, Inflight#{sent := Sent1}} + {{value, Req}, maybe_reset_seq(Inflight#{sent := Sent1})} end. -spec(size(inflight()) -> non_neg_integer()). @@ -93,8 +94,6 @@ is_full(#{max_inflight := Max, sent := Sent}) -> maps:size(Sent) >= Max. -spec(is_empty(inflight()) -> boolean()). -is_empty(#{max_inflight := infinity}) -> - false; is_empty(#{sent := Sent}) -> maps:size(Sent) =< 0. @@ -108,9 +107,9 @@ foreach(F, #{sent := Sent}) -> ). %% @doc Return a sorted list of Pred returned true --spec(retry(Pred, inflight()) -> list({id(), req()}) when +-spec(to_retry_list(Pred, inflight()) -> list({id(), req()}) when Pred :: fun((id(), req()) -> boolean())). -retry(Pred, #{sent := Sent}) -> +to_retry_list(Pred, #{sent := Sent}) -> Need = sort_sent(filter_sent(fun(Id, Req) -> Pred(Id, Req) end, Sent)), lists:map(fun({Id, {_SeqNo, Req}}) -> {Id, Req} end, Need). @@ -120,7 +119,7 @@ retry(Pred, #{sent := Sent}) -> filter_sent(F, Sent) -> maps:filter(fun(Id, {_SeqNo, Req}) -> F(Id, Req) end, Sent). -%% @doc sort with seqno +%% @doc sort with seq sort_sent(Sent) -> Sort = fun({_Id1, {SeqNo1, _Req1}}, {_Id2, {SeqNo2, _Req2}}) -> @@ -128,6 +127,15 @@ sort_sent(Sent) -> end, lists:sort(Sort, maps:to_list(Sent)). +%% @doc avoid integer overflows +maybe_reset_seq(Inflight) -> + case is_empty(Inflight) of + true -> + Inflight#{seq := 1}; + false -> + Inflight + end. + %%-------------------------------------------------------------------- %% tests @@ -135,19 +143,19 @@ sort_sent(Sent) -> insert_delete_test() -> Inflight = emqtt_inflight:new(2), - Inflight1 = emqtt_inflight:insert(1, req1, Inflight), - Inflight2 = emqtt_inflight:insert(2, req2, Inflight1), + {ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight), + {ok, Inflight2} = emqtt_inflight:insert(2, req2, Inflight1), error = emqtt_inflight:insert(3, req3, Inflight2), error = emqtt_inflight:delete(3, Inflight), - {req2, _} = emqtt_inflight:delete(2, Inflight2). + {{value, req2}, _} = emqtt_inflight:delete(2, Inflight2). update_test() -> Inflight = emqtt_inflight:new(2), - Inflight1 = emqtt_inflight:insert(1, req1, Inflight), + {ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight), error = emqtt_inflight:update(2, req2, Inflight1), - Inflight11 = emqtt_inflight:update(1, req11, Inflight1), - {req11, _} = emqtt_inflight:delete(1, Inflight11). + {ok, Inflight11} = emqtt_inflight:update(1, req11, Inflight1), + {{value, req11}, _} = emqtt_inflight:delete(1, Inflight11). size_full_empty_test() -> Inflight = emqtt_inflight:new(1), @@ -155,13 +163,13 @@ size_full_empty_test() -> true = emqtt_inflight:is_empty(Inflight), false = emqtt_inflight:is_full(Inflight), - Inflight1 = emqtt_inflight:insert(1, req1, Inflight), + {ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight), 1 = emqtt_inflight:size(Inflight1), false = emqtt_inflight:is_empty(Inflight1), true = emqtt_inflight:is_full(Inflight1), false = emqtt_inflight:is_full(emqtt_inflight:new(infinity)), - false = emqtt_inflight:is_empty(emqtt_inflight:new(infinity)). + true = emqtt_inflight:is_empty(emqtt_inflight:new(infinity)). foreach_test() -> emqtt_inflight:foreach( @@ -171,21 +179,33 @@ foreach_test() -> retry_test() -> [{"sorted by insert sequence", - [{1, 1}, {2, 2}] = emqtt_inflight:retry( + [{1, 1}, {2, 2}] = emqtt_inflight:to_retry_list( fun(Id, Req) -> Id =:= Req end, inflight_example() ) }, {"filter", - [{2, 2}] = emqtt_inflight:retry( + [{2, 2}] = emqtt_inflight:to_retry_list( fun(Id, _Req) -> Id =:= 2 end, inflight_example()) }]. +reset_seq_test() -> + Inflight = emqtt_inflight:new(infinity), + #{seq := 1} = Inflight, + + {ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight), + #{seq := 2} = Inflight1, + + {_, Inflight2} = emqtt_inflight:delete(1, Inflight1), + + %% reset seq to 1 once inflight is empty + true = emqtt_inflight:is_empty(Inflight2), + #{seq := 1} = Inflight2. + inflight_example() -> - emqtt_inflight:insert( - 2, 2, - emqtt_inflight:insert(1, 1, emqtt_inflight:new(infinity)) - ). + {ok, Inflight} = emqtt_inflight:insert(1, 1, emqtt_inflight:new(infinity)), + {ok, Inflight1} = emqtt_inflight:insert(2, 2, Inflight), + Inflight1. -endif. diff --git a/test/emqtt_SUITE.erl b/test/emqtt_SUITE.erl index eb1ff3c5..4c20981f 100644 --- a/test/emqtt_SUITE.erl +++ b/test/emqtt_SUITE.erl @@ -1,4 +1,4 @@ -%to %-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License");