Skip to content

Commit

Permalink
refactor: safer return type for emqtt_inflight
Browse files Browse the repository at this point in the history
  • Loading branch information
HJianBo committed Aug 24, 2022
1 parent 4b7d949 commit 0cfa735
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 62 deletions.
70 changes: 36 additions & 34 deletions src/emqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@

-type(sent_at() :: non_neg_integer()). %% in millisecond


-export_type([publish_success/0, publish_reply/0]).

-define(ASYNC_PUB(Msg, ExpireAt, Callback),
Expand Down Expand Up @@ -1098,12 +1097,10 @@ connected(info, ?ASYNC_PUB(Msg = #mqtt_msg{qos = ?QOS_0}, ExpireAt, Callback), S
shoot(?PUB_REQ(Msg, ExpireAt, Callback), State0);

connected(info, ?ASYNC_PUB(Msg = #mqtt_msg{qos = QoS}, ExpireAt, Callback),
State0 = #state{pendings = Pendings})
State0 = #state{pendings = Pendings0})
when QoS == ?QOS_1; QoS == ?QOS_2 ->
maybe_shoot(
State0#state{
pendings = enqueue_publish_req(?PUB_REQ(Msg, ExpireAt, Callback), Pendings)
});
Pendings = enqueue_publish_req(?PUB_REQ(Msg, ExpireAt, Callback), Pendings0),
maybe_shoot(State0#state{pendings = Pendings});

connected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, connected, Data).
Expand Down Expand Up @@ -1335,11 +1332,11 @@ shoot(?PUB_REQ(Msg = #mqtt_msg{qos = QoS}, ExpireAt, Callback),
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
case send(Msg1, State) of
{ok, NState} ->
Inflight1 = emqtt_inflight:insert(
PacketId,
?INFLIGHT_PUBLISH(Msg1, now_ts(), ExpireAt, Callback),
Inflight
),
{ok, Inflight1} = emqtt_inflight:insert(
PacketId,
?INFLIGHT_PUBLISH(Msg1, now_ts(), ExpireAt, Callback),
Inflight
),
State1 = ensure_retry_timer(NState#state{inflight = Inflight1}),
maybe_shoot(bump_last_packet_id(State1));
{error, Reason} ->
Expand All @@ -1364,7 +1361,7 @@ ack_inflight(
State = #state{inflight = Inflight}
) ->
case emqtt_inflight:delete(PacketId, Inflight) of
{?INFLIGHT_PUBLISH(_Msg, _SentAt, _ExpireAt, Callback), NInflight} ->
{{value, ?INFLIGHT_PUBLISH(_Msg, _SentAt, _ExpireAt, Callback)}, NInflight} ->
eval_callback_handler(
{ok, #{packet_id => PacketId,
reason_code => ReasonCode,
Expand All @@ -1382,18 +1379,19 @@ ack_inflight(
State = #state{inflight = Inflight}
) ->
case emqtt_inflight:delete(PacketId, Inflight) of
{?INFLIGHT_PUBLISH(_Msg, _SentAt, ExpireAt, Callback), Inflight1} ->
{{value, ?INFLIGHT_PUBLISH(_Msg, _SentAt, ExpireAt, Callback)}, Inflight1} ->
eval_callback_handler(
{ok, #{packet_id => PacketId,
reason_code => ReasonCode,
reason_code_name => reason_code_name(ReasonCode),
properties => Properties}}, Callback),
NInflight = emqtt_inflight:insert(
PacketId,
?INFLIGHT_PUBREL(PacketId, now_ts(), ExpireAt),
Inflight1),
{ok, NInflight} = emqtt_inflight:insert(
PacketId,
?INFLIGHT_PUBREL(PacketId, now_ts(), ExpireAt),
Inflight1
),
State#state{inflight = NInflight};
{?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt), _} ->
{{value, ?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt)}, _} ->
?LOG(notice, "duplicated_PUBREC_packet", #{packet_id => PacketId}, State),
State;
error ->
Expand All @@ -1406,7 +1404,7 @@ ack_inflight(
State = #state{inflight = Inflight}
) ->
case emqtt_inflight:delete(PacketId, Inflight) of
{?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt), NInflight} ->
{{value, ?INFLIGHT_PUBREL(_PacketId, _SentAt, _ExpireAt)}, NInflight} ->
State#state{inflight = NInflight};
error ->
?LOG(warning, "unexpected_PUBCOMP", #{packet_id => PacketId}, State),
Expand Down Expand Up @@ -1524,33 +1522,37 @@ retry_send(State = #state{retry_interval = Intv, inflight = Inflight}) ->
try
Now = now_ts(),
Pred = fun(_, InflightReq) ->
(sent_at(InflightReq) + timer:seconds(Intv)) > Now end,
NInflight = lists:foldl(
fun({PacketId, InflightReq}, InflightAcc) ->
NReq = retry_send(Now, InflightReq, State),
emqtt_inflight:update(PacketId, NReq, InflightAcc)
end, Inflight, emqtt_inflight:retry(Pred, Inflight)
),
{keep_state, ensure_retry_timer(State#state{inflight = NInflight})}
(sent_at(InflightReq) + timer:seconds(Intv)) > Now
end,
NState = retry_send(Now, emqtt_inflight:to_retry_list(Pred, Inflight), State),
{keep_state, ensure_retry_timer(NState)}
catch error : Reason ->
shutdown(Reason, State)
end.

retry_send(Now, ?INFLIGHT_PUBLISH(Msg, _SentAt, _ExpireAt, _Callback), State) ->
retry_send(Now, [{PacketId, ?INFLIGHT_PUBLISH(Msg, _, ExpireAt, Callback)} | More],
State = #state{inflight = Inflight}) ->
Msg1 = Msg#mqtt_msg{dup = true},
case send(Msg1, State) of
{ok, _NewState} ->
?INFLIGHT_PUBLISH(Msg1, Now, _ExpireAt, _Callback);
{ok, NState} ->
NInflightReq = ?INFLIGHT_PUBLISH(Msg1, Now, ExpireAt, Callback),
{ok, NInflight} = emqtt_inflight:update(PacketId, NInflightReq, Inflight),
retry_send(Now, More, NState#state{inflight = NInflight});
{error, Reason} ->
error(Reason)
end;
retry_send(Now, ?INFLIGHT_PUBREL(PacketId, _SentAt, _ExpireAt), State) ->
retry_send(Now, [{PacketId, ?INFLIGHT_PUBREL(PacketId, _, ExpireAt)} | More],
State = #state{inflight = Inflight}) ->
case send(?PUBREL_PACKET(PacketId), State) of
{ok, _NewState} ->
?INFLIGHT_PUBREL(PacketId, Now, _ExpireAt);
{ok, NState} ->
NInflightReq = ?INFLIGHT_PUBREL(PacketId, Now, ExpireAt),
{ok, NInflight} = emqtt_inflight:update(PacketId, NInflightReq, Inflight),
retry_send(Now, More, NState#state{inflight = NInflight});
{error, Reason} ->
error(Reason)
end.
end;
retry_send(_Now, [], State) ->
State.

deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload},
Expand Down
74 changes: 47 additions & 27 deletions src/emqtt_inflight.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
, is_full/1
, is_empty/1
, foreach/2
, retry/2
, to_retry_list/2
]).

-type(inflight() :: inflight(req())).
Expand All @@ -48,7 +48,7 @@

-type(req() :: term()).

-export_type([inflight/1]).
-export_type([inflight/1, inflight/0]).

%%--------------------------------------------------------------------
%% APIs
Expand All @@ -57,29 +57,30 @@
new(MaxInflight) ->
#{max_inflight => MaxInflight, sent => #{}, seq => 1}.

-spec(insert(id(), req(), inflight()) -> error | inflight()).
-spec(insert(id(), req(), inflight()) -> error | {ok, inflight()}).
insert(Id, Req, Inflight = #{max_inflight := Max, sent := Sent, seq := Seq}) ->
case maps:size(Sent) >= Max of
true ->
error;
false ->
Inflight#{sent := maps:put(Id, {Seq, Req}, Sent), seq := Seq + 1}
{ok, Inflight#{sent := maps:put(Id, {Seq, Req}, Sent),
seq := Seq + 1}}
end.

-spec(update(id(), req(), inflight()) -> inflight()).
-spec(update(id(), req(), inflight()) -> error | {ok, inflight()}).
update(Id, Req, Inflight = #{sent := Sent}) ->
case maps:find(Id, Sent) of
error -> error;
{ok, {No, _OldReq}} ->
Inflight#{sent := maps:put(Id, {No, Req}, Sent)}
{ok, Inflight#{sent := maps:put(Id, {No, Req}, Sent)}}
end.

-spec(delete(id(), inflight()) -> error | {req(), inflight()}).
-spec(delete(id(), inflight()) -> error | {{value, req()}, inflight()}).
delete(Id, Inflight = #{sent := Sent}) ->
case maps:take(Id, Sent) of
error -> error;
{{_, Req}, Sent1} ->
{Req, Inflight#{sent := Sent1}}
{{value, Req}, maybe_reset_seq(Inflight#{sent := Sent1})}
end.

-spec(size(inflight()) -> non_neg_integer()).
Expand All @@ -93,8 +94,6 @@ is_full(#{max_inflight := Max, sent := Sent}) ->
maps:size(Sent) >= Max.

-spec(is_empty(inflight()) -> boolean()).
is_empty(#{max_inflight := infinity}) ->
false;
is_empty(#{sent := Sent}) ->
maps:size(Sent) =< 0.

Expand All @@ -108,9 +107,9 @@ foreach(F, #{sent := Sent}) ->
).

%% @doc Return a sorted list of Pred returned true
-spec(retry(Pred, inflight()) -> list({id(), req()}) when
-spec(to_retry_list(Pred, inflight()) -> list({id(), req()}) when
Pred :: fun((id(), req()) -> boolean())).
retry(Pred, #{sent := Sent}) ->
to_retry_list(Pred, #{sent := Sent}) ->
Need = sort_sent(filter_sent(fun(Id, Req) -> Pred(Id, Req) end, Sent)),
lists:map(fun({Id, {_SeqNo, Req}}) -> {Id, Req} end, Need).

Expand All @@ -120,48 +119,57 @@ retry(Pred, #{sent := Sent}) ->
filter_sent(F, Sent) ->
maps:filter(fun(Id, {_SeqNo, Req}) -> F(Id, Req) end, Sent).

%% @doc sort with seqno
%% @doc sort with seq
sort_sent(Sent) ->
Sort = fun({_Id1, {SeqNo1, _Req1}},
{_Id2, {SeqNo2, _Req2}}) ->
SeqNo1 < SeqNo2
end,
lists:sort(Sort, maps:to_list(Sent)).

%% @doc avoid integer overflows
maybe_reset_seq(Inflight) ->
case is_empty(Inflight) of
true ->
Inflight#{seq := 1};
false ->
Inflight
end.

%%--------------------------------------------------------------------
%% tests

-ifdef(TEST).

insert_delete_test() ->
Inflight = emqtt_inflight:new(2),
Inflight1 = emqtt_inflight:insert(1, req1, Inflight),
Inflight2 = emqtt_inflight:insert(2, req2, Inflight1),
{ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight),
{ok, Inflight2} = emqtt_inflight:insert(2, req2, Inflight1),
error = emqtt_inflight:insert(3, req3, Inflight2),
error = emqtt_inflight:delete(3, Inflight),
{req2, _} = emqtt_inflight:delete(2, Inflight2).
{{value, req2}, _} = emqtt_inflight:delete(2, Inflight2).

update_test() ->
Inflight = emqtt_inflight:new(2),
Inflight1 = emqtt_inflight:insert(1, req1, Inflight),
{ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight),
error = emqtt_inflight:update(2, req2, Inflight1),

Inflight11 = emqtt_inflight:update(1, req11, Inflight1),
{req11, _} = emqtt_inflight:delete(1, Inflight11).
{ok, Inflight11} = emqtt_inflight:update(1, req11, Inflight1),
{{value, req11}, _} = emqtt_inflight:delete(1, Inflight11).

size_full_empty_test() ->
Inflight = emqtt_inflight:new(1),
0 = emqtt_inflight:size(Inflight),
true = emqtt_inflight:is_empty(Inflight),
false = emqtt_inflight:is_full(Inflight),

Inflight1 = emqtt_inflight:insert(1, req1, Inflight),
{ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight),
1 = emqtt_inflight:size(Inflight1),
false = emqtt_inflight:is_empty(Inflight1),
true = emqtt_inflight:is_full(Inflight1),

false = emqtt_inflight:is_full(emqtt_inflight:new(infinity)),
false = emqtt_inflight:is_empty(emqtt_inflight:new(infinity)).
true = emqtt_inflight:is_empty(emqtt_inflight:new(infinity)).

foreach_test() ->
emqtt_inflight:foreach(
Expand All @@ -171,21 +179,33 @@ foreach_test() ->

retry_test() ->
[{"sorted by insert sequence",
[{1, 1}, {2, 2}] = emqtt_inflight:retry(
[{1, 1}, {2, 2}] = emqtt_inflight:to_retry_list(
fun(Id, Req) -> Id =:= Req end,
inflight_example()
)
},
{"filter",
[{2, 2}] = emqtt_inflight:retry(
[{2, 2}] = emqtt_inflight:to_retry_list(
fun(Id, _Req) -> Id =:= 2 end,
inflight_example())
}].

reset_seq_test() ->
Inflight = emqtt_inflight:new(infinity),
#{seq := 1} = Inflight,

{ok, Inflight1} = emqtt_inflight:insert(1, req1, Inflight),
#{seq := 2} = Inflight1,

{_, Inflight2} = emqtt_inflight:delete(1, Inflight1),

%% reset seq to 1 once inflight is empty
true = emqtt_inflight:is_empty(Inflight2),
#{seq := 1} = Inflight2.

inflight_example() ->
emqtt_inflight:insert(
2, 2,
emqtt_inflight:insert(1, 1, emqtt_inflight:new(infinity))
).
{ok, Inflight} = emqtt_inflight:insert(1, 1, emqtt_inflight:new(infinity)),
{ok, Inflight1} = emqtt_inflight:insert(2, 2, Inflight),
Inflight1.

-endif.
2 changes: 1 addition & 1 deletion test/emqtt_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%to %--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
Expand Down

0 comments on commit 0cfa735

Please sign in to comment.