Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry handler #3

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ See [opencensus-erlang](https://github.com/census-instrumentation/opencensus-erl

#### Statistics

OpenCensus
----

Statistics are collected by implementing a stats handler module. A handler for OpenCensus stats (be sure to include [OpenCensus](https://hex.pm/packages/opencensus) as a dependency and make sure it starts on boot) is provided and can be enabled for the server with a config option:

``` erlang
Expand All @@ -258,6 +261,13 @@ For actual use, an [exporter for Prometheus](https://github.com/opencensus-beam/

Details on all the metrics that are collected can be found in the [OpenCensus gRPC Stats specification](https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md).

Telemetry
----

Metrics can also be collected and emitted for downstream handling by your application using the [Telemetry](https://hex.pm/packages/telemetry) Erlang/Elixir library. When you include Telemetry among your application's dependencies and attach handlers to grpcbox events (and any others emitted by your app or its instrumented dependencies) you can send those events and their metadata to any monitoring and aggragation services by way of those handlers.

The Telemetry handler can be used the same way as the OpenCensus handler, by setting the value of the `stats_handler` key in the `grpc_opts` config map to `grpcbox_telemetry_stats_handler`. The Telemetry handler and OpenCensus handlers are mutually exclusive and cannot be used in conjunction.

#### Metadata

Metadata is sent in headers and trailers.
Expand Down
5 changes: 3 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{erl_opts, [debug_info]}.

{deps, [{chatterbox, {pkg, ts_chatterbox}},
{deps, [
{chatterbox, ".*", {git, "https://github.com/andymck/chatterbox", {branch, "andymck/fix-trailers-close-race-condition"}}},
ctx,
acceptor_pool,
gproc]}.
Expand Down Expand Up @@ -48,7 +49,7 @@
deprecated_function_calls, deprecated_functions]}.

{project_plugins, [covertool,
{grpcbox_plugin, "~> 0.7.0"},
{grpcbox_plugin, {git, "https://github.com/andymck/grpcbox_plugin.git",{branch, "master"}}},
rebar3_lint]}.

{cover_enabled, true}.
Expand Down
7 changes: 4 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
{"1.2.0",
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},0},
{<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.11.0">>},0},
{<<"chatterbox">>,
{git,"https://github.com/andymck/chatterbox",
{ref,"fe0a96049723ac07174cb08ed6d8a1b4f6997be9"}},
0},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1}]}.
[
{pkg_hash,[
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"chatterbox">>, <<"B8F372C706023EB0DE5BF2976764EDB27C70FE67052C88C1F6A66B3A5626847F">>},
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]},
{pkg_hash_ext,[
{<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>},
{<<"chatterbox">>, <<"722FE2BAD52913AB7E87D849FC6370375F0C961FFB2F0B5E6D647C9170C382A6">>},
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]}
Expand Down
6 changes: 5 additions & 1 deletion src/grpcbox_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
-behaviour(acceptor_pool).

-export([start_link/4,
accept_socket/3]).
accept_socket/3,
pool_sockets/1]).

-export([init/1]).

Expand All @@ -13,6 +14,9 @@ start_link(Name, ServerOpts, ChatterboxOpts, TransportOpts) ->
accept_socket(Pool, Socket, Acceptors) ->
acceptor_pool:accept_socket(Pool, Socket, Acceptors).

pool_sockets(Pool) ->
acceptor_pool:which_sockets(Pool).

init([ServerOpts, ChatterboxOpts, TransportOpts]) ->
{Transport, SslOpts} = case TransportOpts of
#{ssl := true,
Expand Down
37 changes: 19 additions & 18 deletions src/grpcbox_reflection_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,39 @@
#{error_code => 12,
error_message => "unimplemented method since extensions removed in proto3"}}).

server_reflection_info(Ref, Stream) ->
receive
{Ref, eos} ->
ok;
{Ref, Message} ->
handle_message(Message, Stream),
server_reflection_info(Ref, Stream)
end.
server_reflection_info(Message, Stream) ->
handle_message(Message, Stream).

handle_message(eos=_OriginalRequest, Stream) ->
{stop, Stream};
handle_message(#{message_request := {list_services, _}}=OriginalRequest, Stream) ->
Services = list_services(),
grpcbox_stream:send(#{original_request => OriginalRequest,
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => {list_services_response,
#{service => Services}}}, Stream);
#{service => Services}}}, Stream),
{ok, Stream0};
handle_message(#{message_request := {file_by_filename, Filename}}=OriginalRequest, Stream) ->
Response = file_by_filename(Filename),
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => Response}, Stream);
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => Response}, Stream),
{ok, Stream0};
handle_message(#{message_request := {file_containing_symbol, Symbol}}=OriginalRequest, Stream) ->
Response = file_containing_symbol(Symbol),
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => Response}, Stream);
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => Response}, Stream),
{ok, Stream0};

%% proto3 dropped extensions so we'll just return an empty result

handle_message(#{message_request := {all_extension_numbers_of_type, _}}=OriginalRequest, Stream) ->
grpcbox_stream:send(#{original_request => OriginalRequest,
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE},
Stream);
Stream),
{ok, Stream0};
handle_message(#{message_request := {file_containing_extension, _}}=OriginalRequest, Stream) ->
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE}, Stream).
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE}, Stream),
{ok, Stream0}.

%%

Expand Down
24 changes: 16 additions & 8 deletions src/grpcbox_services_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init([ServerOpts, GrpcOpts, ListenOpts, PoolOpts, TransportOpts, ServiceSupName]
%% unique name for pool based on the ip and port it will listen on
Name = pool_name(ListenOpts),

RestartStrategy = #{strategy => rest_for_one},
RestartStrategy = #{strategy => rest_for_one, intensity => 5, period => 2},
Pool = #{id => grpcbox_pool,
start => {grpcbox_pool, start_link, [Name, chatterbox:settings(server, ServerOpts),
ChatterboxOpts, TransportOpts]}},
Expand Down Expand Up @@ -127,13 +127,21 @@ load_services([], _, _) ->
ok;
load_services([ServicePbModule | Rest], Services, ServicesTable) ->
ServiceNames = ServicePbModule:get_service_names(),
%% NOTE: Methods value may be a map or a prop depending on gpb options when generating the services
[begin
{{service, _}, Methods} = ServicePbModule:get_service_def(ServiceName),
%% throws exception if ServiceName isn't in the map or doesn't exist
try ServiceModule = maps:get(ServiceName, Services),
try
ServiceModule = maps:get(ServiceName, Services),
{ServiceModule, ServiceModule:module_info(exports)} of
{ServiceModule1, Exports} ->
[begin
#{name := Name,
input := Input,
output := Output,
input_stream := InputStream,
output_stream := OutputStream,
opts := Opts} = ensure_map(P),
SnakedMethodName = atom_snake_case(Name),
case lists:member({SnakedMethodName, 2}, Exports) of
true ->
Expand All @@ -149,12 +157,7 @@ load_services([ServicePbModule | Rest], Services, ServicesTable) ->
%% TODO: error? log? insert into ets as unimplemented?
unimplemented_method
end
end || #{name := Name,
input := Input,
output := Output,
input_stream := InputStream,
output_stream := OutputStream,
opts := Opts} <- Methods]
end || P <- Methods]
catch
_:_ ->
%% TODO: error? log? insert into ets as unimplemented?
Expand All @@ -179,3 +182,8 @@ atom_snake_case(Name) ->
Snaked1 = string:replace(Snaked, ".", "_", all),
Snaked2 = string:replace(Snaked1, "__", "_", all),
list_to_atom(string:to_lower(unicode:characters_to_list(Snaked2))).

ensure_map(S) when is_map(S)->
S;
ensure_map(S) when is_list(S)->
maps:from_list(S).
48 changes: 46 additions & 2 deletions src/grpcbox_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,51 @@ init([Pool, ListenOpts, PoolOpts]) ->
{ok, Socket} ->
%% acceptor could close the socket if there is a problem
MRef = monitor(port, Socket),
grpcbox_pool:accept_socket(Pool, Socket, AcceptorPoolSize),
{ok, _} = grpcbox_pool:accept_socket(Pool, Socket, AcceptorPoolSize),
{ok, {Socket, MRef}};
{error, eaddrinuse} ->
%% our desired port is already in use
%% its likely this grpcbox_socket server has been killed ( for reason unknown ) and is restarting
%% previously it would have bound to the port before passing control to our acceptor pool
%% the socket remains open
%% in the restart scenario, the socket process would attempt to bind again
%% to the port and then stop, the sup would keep restarting it
%% and we would end up breaching the restart strategy of the parent sup
%% eventually taking down the entire tree
%% result of which is we have no active listener and grpcbox is effectively down
%% so now if we hit eaddrinuse, we check if our acceptor pool using it
%% if so we close the port here and stop this process
%% NOTE: issuing stop in init wont trigger terminate and so cant rely on
%% the socket being closed there
%% This allows the sup to restart things cleanly
%% We could try to reuse the exising port rather than closing it
%% but side effects were encountered there, so deliberately avoiding

%% NOTE: acceptor_pool has a grace period for connections before it terminates
%% grpcbox_pool sets this to a default of 5 secs
%% this needs considered when deciding on related supervisor restart strategies
%% AND keep in mind the acceptor pool will continue accepting new connections
%% during this grace period

%% get the current sockets in use by the acceptor pool
%% if one is bound to our target port then close it
%% need to allow for possibility of multiple services, each with its own socket
%% so we need to identify our interested socket via port number
PoolSockets = grpcbox_pool:pool_sockets(Pool),
MaybeHaveExistingSocket =
lists:foldl(
fun({inet_tcp, {_IP, BoundPortNumber}, Socket, _SockRef}, _Acc) when BoundPortNumber =:= Port ->
{ok, Socket};
(_, Acc) ->
Acc
end, socket_not_found, PoolSockets),
case MaybeHaveExistingSocket of
{ok, Socket} ->
gen_tcp:close(Socket);
socket_not_found ->
noop
end,
{stop, eaddrinuse};
{error, Reason} ->
{stop, Reason}
end.
Expand All @@ -54,9 +97,10 @@ handle_info(_, State) ->
code_change(_, State, _) ->
{ok, State}.

terminate(_, {Socket, MRef}) ->
terminate(_Reason, {Socket, MRef}) ->
%% Socket may already be down but need to ensure it is closed to avoid
%% eaddrinuse error on restart
%% this takes care of that, unless of course this process is killed...
case demonitor(MRef, [flush, info]) of
true -> gen_tcp:close(Socket);
false -> ok
Expand Down
Loading