Skip to content

Commit

Permalink
Use cow_http2_machine:ensure_window
Browse files Browse the repository at this point in the history
Gun was very inefficient at receiving HTTP/2 bodies. Switching
to ensure_window and increasing the default window sizes brings
the response body reading performance at least on par with the
one for HTTP/1.1.

This has a small negative impact on message flow control because
we stop updating the window later than we did before, increasing
the number of extra messages we may send. The exact amount depends
on configuration and the exact moment flow control kicks in.
  • Loading branch information
essen committed Sep 13, 2019
1 parent 585c1dc commit 4194682
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 48 deletions.
98 changes: 59 additions & 39 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

%% Flow control.
flow :: integer() | infinity,
flow_window = 0 :: non_neg_integer(),

%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
Expand Down Expand Up @@ -87,14 +86,24 @@ do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
%% @todo Add all http2_machine options.
do_check_options([{initial_connection_window_size, _}|Opts]) ->
do_check_options(Opts);
do_check_options([{initial_stream_window_size, _}|Opts]) ->
do_check_options(Opts);
do_check_options([{max_frame_size_received, _}|Opts]) ->
do_check_options(Opts);
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.

name() -> http2.

init(Owner, Socket, Transport, Opts) ->
init(Owner, Socket, Transport, Opts0) ->
%% We have different defaults than the protocol in order
%% to optimize for performance when receiving responses.
Opts = Opts0#{
initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000),
initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000)
},
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
%% @todo Better validate the preface being received.
Expand Down Expand Up @@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
end,
State.

lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, DataLen) ->
Transport:send(Socket, cow_http2:window_update(DataLen)),
HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
State#http2_state{http2_machine=HTTP2Machine1}.
lingering_data_frame(State, _DataLen) ->
%% We only update the connection's window when receiving
%% a lingering data frame.
update_window(State).

data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
EvHandler, EvHandlerState0) ->
data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID),
handler_state=Handlers0} = get_stream_by_id(State0, StreamID),
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - Dec
end,
Size = byte_size(Data),
FlowWindow = if
IsFin =:= nofin, Flow =< 0 ->
FlowWindow0 + Size;
true ->
FlowWindow0
end,
{HTTP2Machine, EvHandlerState} = case Size of
State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
{State, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
{HTTP2Machine0, EvHandlerState1};
{State1, EvHandlerState1};
0 ->
{HTTP2Machine0, EvHandlerState0};
{State1, EvHandlerState0};
_ ->
Transport:send(Socket, cow_http2:window_update(Size)),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
%% We do not send a stream WINDOW_UPDATE when the flow control kicks in
%% (it'll be sent when the flow recovers) or for the last DATA frame.
case IsFin of
nofin when Flow =< 0 ->
{HTTP2Machine1, EvHandlerState0};
{update_window(State1), EvHandlerState0};
nofin ->
Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
{cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
EvHandlerState0};
{update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
{HTTP2Machine1, EvHandlerState1}
{update_window(State1), EvHandlerState1}
end
end,
{maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin),
EvHandlerState}.
{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.

headers_frame(State=#http2_state{content_handlers=Handlers0},
StreamID, IsFin, Headers, PseudoHeaders, _BodyLen,
Expand Down Expand Up @@ -378,28 +372,54 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.

update_flow(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) ->
update_flow(State, _ReplyTo, StreamRef, Inc) ->
case get_stream_by_ref(State, StreamRef) of
Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} ->
Stream=#stream{id=StreamID, flow=Flow0} ->
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 + Inc
end,
if
%% Flow is active again, update the window.
%% Flow is active again, update the stream's window.
Flow0 =< 0, Flow > 0 ->
Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)),
HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0),
{state, store_stream(State#http2_state{http2_machine=HTTP2Machine},
Stream#stream{flow=Flow, flow_window=0})};
{state, update_window(store_stream(State,
Stream#stream{flow=Flow}), StreamID)};
true ->
{state, store_stream(State, Stream#stream{flow=Flow})}
end;
false ->
[]
end.

%% Only update the connection's window.
update_window(State=#http2_state{socket=Socket, transport=Transport,
opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
ok ->
State;
{ok, Increment, HTTP2Machine} ->
Transport:send(Socket, cow_http2:window_update(Increment)),
State#http2_state{http2_machine=HTTP2Machine}
end.

%% Update both the connection and the stream's window.
update_window(State=#http2_state{socket=Socket, transport=Transport,
opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow},
http2_machine=HTTP2Machine0}, StreamID) ->
{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
ok -> {<<>>, HTTP2Machine0};
{ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
end,
{Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of
ok -> {<<>>, HTTP2Machine2};
{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
end,
case {Data1, Data2} of
{<<>>, <<>>} -> ok;
_ -> Transport:send(Socket, [Data1, Data2])
end,
State#http2_state{http2_machine=HTTP2Machine}.

%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
%% the one previously received.
Expand Down
29 changes: 22 additions & 7 deletions test/flow_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ default_flow_http2(_) ->
flow => 1,
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
initial_connection_window_size => 65535,
initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
Expand All @@ -86,8 +88,11 @@ default_flow_http2(_) ->
%% Then we confirm that we can override it per request.
StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}),
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef2),
%% We set the flow to 2 therefore we will receive *3* data messages
%% and then nothing because two windows have been fully consumed.
%% We set the flow to 2 but due to the ensure_window algorithm
%% we end up receiving *5* data messages before flow control kicks in,
%% equivalent to 3 SSE events.
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
{data, nofin, _} = gun:await(ConnPid, StreamRef2),
Expand Down Expand Up @@ -132,7 +137,11 @@ flow_http2(_) ->
{ok, ConnPid} = gun:open("localhost", Port, #{
%% We set the max frame size to the same as the initial
%% window size in order to reduce the number of data messages.
http2_opts => #{max_frame_size_received => 65535},
http2_opts => #{
initial_connection_window_size => 65535,
initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
}),
{ok, http2} = gun:await_up(ConnPid),
Expand All @@ -145,14 +154,16 @@ flow_http2(_) ->
%% We consumed all the window available.
65535 = byte_size(D1) + byte_size(D2),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
%% We then update the flow and get *3* more data messages but no more.
%% We then update the flow and get *5* more data messages but no more.
gun:update_flow(ConnPid, StreamRef, 2),
{data, nofin, D3} = gun:await(ConnPid, StreamRef),
{data, nofin, D4} = gun:await(ConnPid, StreamRef),
{data, nofin, D5} = gun:await(ConnPid, StreamRef),
{data, nofin, D6} = gun:await(ConnPid, StreamRef),
{data, nofin, D7} = gun:await(ConnPid, StreamRef),
%% We consumed all the window available again.
%% D3 is the end of the truncated D2, D4 is full and D5 truncated.
65535 = byte_size(D3) + byte_size(D4) + byte_size(D5),
%% D3 is the end of the truncated D2, D4, D5 and D6 are full and D7 truncated.
131070 = byte_size(D3) + byte_size(D4) + byte_size(D5) + byte_size(D6) + byte_size(D7),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
Expand Down Expand Up @@ -302,6 +313,8 @@ sse_flow_http2(_) ->
%% window size in order to reduce the number of data messages.
http2_opts => #{
content_handlers => [gun_sse_h, gun_data_h],
initial_connection_window_size => 65535,
initial_stream_window_size => 65535,
max_frame_size_received => 65535
},
protocols => [http2]
Expand All @@ -314,10 +327,12 @@ sse_flow_http2(_) ->
%% the second event was fully received.
{sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 3000),
%% We then update the flow and get 2 more event messages but no more.
%% We then update the flow and get 3 more event messages but no more.
%% We get an extra message because of the ensure_window algorithm.
gun:update_flow(ConnPid, StreamRef, 2),
{sse, _} = gun:await(ConnPid, StreamRef),
{sse, _} = gun:await(ConnPid, StreamRef),
{sse, _} = gun:await(ConnPid, StreamRef),
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid)
after
Expand Down
2 changes: 2 additions & 0 deletions test/gun_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ http2_handshake(Socket, Transport) ->
%% Receive the SETTINGS from the preface.
{ok, <<Len:24>>} = Transport:recv(Socket, 3, 5000),
{ok, <<4:8, 0:40, _:Len/binary>>} = Transport:recv(Socket, 6 + Len, 5000),
%% Receive the WINDOW_UPDATE sent with the preface.
{ok, <<4:24, 8:8, 0:40, _:32>>} = Transport:recv(Socket, 13, 5000),
%% Send the SETTINGS ack.
ok = Transport:send(Socket, cow_http2:settings_ack()),
%% Receive the SETTINGS ack.
Expand Down
12 changes: 10 additions & 2 deletions test/rfc7540_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,17 @@ lingering_data_counts_toward_connection_window(_) ->
%% Skip RST_STREAM.
{ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
%% Received a WINDOW_UPDATE frame after we got RST_STREAM.
{ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000)
{ok, << 4:24, 8:8, 0:40, Increment:32 >>} = gen_tcp:recv(Socket, 13, 1000),
true = Increment > 0
end),
{ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}),
{ok, ConnPid} = gun:open("localhost", Port, #{
protocols => [http2],
http2_opts => #{
%% We don't set 65535 because we still want to have an initial WINDOW_UPDATE.
initial_connection_window_size => 65536,
initial_stream_window_size => 65535
}
}),
{ok, http2} = gun:await_up(ConnPid),
timer:sleep(100), %% Give enough time for the handshake to fully complete.
%% Step 1.
Expand Down

0 comments on commit 4194682

Please sign in to comment.