diff --git a/rebar.lock b/rebar.lock index 66859cdc2..cae513810 100644 --- a/rebar.lock +++ b/rebar.lock @@ -5,7 +5,7 @@ {<<"base64url">>,{pkg,<<"base64url">>,<<"1.0.1">>},1}, {<<"blockchain">>, {git,"https://github.com/helium/blockchain-core.git", - {ref,"774944d6b39e7f96563ecd380503be95df968e3f"}}, + {ref,"619485b2ae4fcadc83aad39a8a516849623bbfda"}}, 0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"chatterbox">>, @@ -86,7 +86,7 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},2}, {<<"grpc_client">>, {git,"https://github.com/Bluehouse-Technology/grpc_client.git", - {ref,"f29516eaf590c63c0c300f98c9b75feff24f8351"}}, + {ref,"02689e7992ef70b42b6b890db450ae4a7c21a50c"}}, 0}, {<<"grpc_lib">>, {git,"https://github.com/Bluehouse-Technology/grpc_lib", @@ -107,7 +107,7 @@ 0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", - {ref,"51df6195545a3ff75689e2377988681da467ab7a"}}, + {ref,"c88928d7e794372411674e98580e940b542a8d4f"}}, 1}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},3}, {<<"http2_client">>, @@ -176,7 +176,7 @@ 3}, {<<"sibyl">>, {git,"https://github.com/helium/sibyl.git", - {ref,"293c4acafe57cc49bc8943a7691bda7dfd720ef4"}}, + {ref,"f7b290638b7c55dac95383abc9936580420350cb"}}, 0}, {<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},2}, {<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},4}, diff --git a/src/poc/grpc_client_custom.erl b/src/poc/grpc_client_custom.erl index b335927db..832b4740f 100644 --- a/src/poc/grpc_client_custom.erl +++ b/src/poc/grpc_client_custom.erl @@ -83,7 +83,10 @@ %% Passed on to the HTTP/2 client. See the documentation of 'http2_client' for the options %% that can be specified for the default HTTP2/2 client. --type connection() :: grpc_client_connection:connection(). +-type connection() :: #{http_connection := pid(), + host := binary(), + scheme := binary(), + client := module()}. -type metadata_key() :: binary(). -type metadata_value() :: binary(). @@ -106,12 +109,12 @@ -type unary_response() :: ok_response() | error_response(). -type ok_response() :: - {ok, #{result => any(), - status_message => binary(), - http_status => 200, - grpc_status => 0, - headers => metadata(), - trailers => metadata()}}. + {ok, #{result := any(), + status_message := binary(), + http_status := 200, + grpc_status := 0, + headers := metadata(), + trailers := metadata()}}. -type error_response() :: {error, #{error_type => error_type(), @@ -183,7 +186,7 @@ connect(Transport, Host, Port, Options) -> -spec new_stream(Connection::connection(), Service::atom(), Rpc::atom(), - DecoderModule::module()) -> {ok, client_stream()}. + DecoderModule::module()) -> {ok, Pid::pid()} | {error, Reason::term()}. %% @equiv new_stream(Connection, Service, Rpc, DecoderModule, []) new_stream(Connection, Service, Rpc, DecoderModule) -> new_stream(Connection, Service, Rpc, DecoderModule, []). @@ -192,7 +195,7 @@ new_stream(Connection, Service, Rpc, DecoderModule) -> Service::atom(), Rpc::atom(), DecoderModule::module(), - Options::[stream_option()]) -> {ok, client_stream()}. + Options::[stream_option()]) -> {ok, Pid::pid()} | {error, Reason::term()}. %% @doc Create a new stream to start a new RPC. new_stream(Connection, Service, Rpc, DecoderModule, Options) -> CBMod = proplists:get_value(callback_mod, Options), @@ -232,9 +235,9 @@ rcv(Stream, Timeout) -> get(Stream) -> grpc_client_stream_custom:get(Stream). --spec ping(Connection::connection(), - Timeout::timeout()) -> {ok, RoundTripTime::integer()} | - {error, term()}. +-spec ping( + Connection::connection(), + Timeout::timeout()) -> {ok, RoundTripTime::integer()} | {error, term()}. %% @doc Send a PING request. ping(Connection, Timeout) -> grpc_client_connection:ping(Connection, Timeout). @@ -280,4 +283,4 @@ unary(Connection, Message, Service, Rpc, Decoder, Options) -> lager:warning("Failed to create stream. Type: ~p, Error: ~p, Stack:~p", [_Type, _Error, _Stack]), {error, #{error_type => client, status_message => <<"stream create failed">>}} - end. \ No newline at end of file + end. diff --git a/src/poc/grpc_client_stream_custom.erl b/src/poc/grpc_client_stream_custom.erl index 58b1fa471..2bd81742d 100644 --- a/src/poc/grpc_client_stream_custom.erl +++ b/src/poc/grpc_client_stream_custom.erl @@ -53,30 +53,30 @@ %% gen_server behaviors -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). -%%-type stream() :: -%% #{stream_id := integer(), -%% package := string(), -%% service := string(), -%% rpc := string(), -%% queue := queue:queue(), -%% response_pending := boolean(), -%% state := idle | open | half_closed_local | half_closed_remote | closed, -%% encoder := module(), -%% connection := grpc_client_custom:connection(), -%% headers_sent := boolean(), -%% metadata := grpc_client_custom:metadata(), -%% compression := grpc_client_custom:compression_method(), -%% buffer := binary(), -%% handler_callback := undefined, -%% handler_state := undefined, -%% type := unary | streaming | undefined}. - --spec new(Connection::pid(), +-type stream() :: + #{stream_id := integer(), + package := string(), + service := string(), + rpc := string(), + queue := queue:queue(), + response_pending := boolean(), + state := idle | open | half_closed_local | half_closed_remote | closed, + encoder := module(), + connection := grpc_client_custom:connection(), + headers_sent := boolean(), + metadata := grpc_client_custom:metadata(), + compression := grpc_client_custom:compression_method(), + buffer := binary(), + handler_callback := undefined, + handler_state := undefined, + type := unary | streaming | undefined}. + +-spec new(Connection::grpc_client_custom:connection(), Service::atom(), Rpc::atom(), Encoder::module(), Options::list(), - HandlerMod::atom() ) -> {ok, Pid::pid()} | {error, Reason::term()}. + HandlerMod::module() ) -> {ok, Pid::pid()} | {error, Reason::term()}. new(Connection, Service, Rpc, Encoder, Options, HandlerMod) -> gen_server:start_link(?MODULE, {Connection, Service, Rpc, Encoder, Options, HandlerMod}, []). diff --git a/src/poc/miner_poc_grpc_client_handler.erl b/src/poc/miner_poc_grpc_client_handler.erl index f05f91b30..a682c7fed 100644 --- a/src/poc/miner_poc_grpc_client_handler.erl +++ b/src/poc/miner_poc_grpc_client_handler.erl @@ -47,8 +47,7 @@ connect(PeerP2P) -> connect(PeerP2P, PeerIP, GRPCPort) -> try lager:debug("connecting over grpc to peer ~p via IP ~p and port ~p", [PeerP2P, PeerIP, GRPCPort]), - {ok, Connection} = grpc_client_custom:connect(tcp, PeerIP, GRPCPort), - {ok, Connection} + grpc_client_custom:connect(tcp, PeerIP, GRPCPort) catch _Error:_Reason:_Stack -> lager:warning("*** failed to connect over grpc to peer ~p. Reason ~p Stack ~p", [PeerP2P, _Reason, _Stack]), {error, failed_to_connect_to_grpc_peer} diff --git a/src/poc/miner_poc_grpc_client_statem.erl b/src/poc/miner_poc_grpc_client_statem.erl index a9dcf7747..42e3f41c9 100644 --- a/src/poc/miner_poc_grpc_client_statem.erl +++ b/src/poc/miner_poc_grpc_client_statem.erl @@ -1,9 +1,7 @@ -module(miner_poc_grpc_client_statem). -behavior(gen_statem). - -%%-dialyzer({nowarn_function, process_unary_response/1}). -%%-dialyzer({nowarn_function, handle_info/2}). -%%-dialyzer({nowarn_function, build_config_req/1}). +%% tmp disable no match warning from dialyzer +%%-dialyzer(no_match). -include("src/grpc/autogen/client/gateway_miner_client_pb.hrl"). -include_lib("public_key/include/public_key.hrl"). @@ -59,14 +57,19 @@ %% delay between validator reconnects attempts -define(VALIDATOR_RECONNECT_DELAY, 5000). +-ifdef(TEST). %% delay between active check retries %% these checks determine where to proceed with %% grpc requests... +-define(ACTIVE_CHECK_DELAY, 1000). +%% interval in seconds at which queued check target reqs are processed +-define(CHECK_TARGET_REQ_DELAY, 5000). +-else. -define(ACTIVE_CHECK_DELAY, 30000). +-define(CHECK_TARGET_REQ_DELAY, 60000). +-endif. %% delay between stream reconnects attempts -define(STREAM_RECONNECT_DELAY, 5000). -%% interval in seconds at which queued check target reqs are processed --define(CHECK_TARGET_REQ_DELAY, 60000). %% ets table name for check target reqs cache -define(CHECK_TARGET_REQS, check_target_reqs). -type data() :: #data{}. @@ -79,6 +82,7 @@ connected/3 ]). +-type unary_result() :: {error, any(), map()} | {error, any()} | {ok, any(), map()} | {ok, map()}. %% ------------------------------------------------------------------ %% API Definitions %% ------------------------------------------------------------------ @@ -108,23 +112,30 @@ stop() -> connection() -> gen_statem:call(?MODULE, connection, infinity). --spec send_report(witness | receipt, any(), binary()) -> ok. +-spec send_report( + ReportType :: witness | receipt, + Report :: #blockchain_poc_receipt_v1_pb{} | #blockchain_poc_witness_v1_pb{}, + OnionKeyHash :: binary()) -> ok. send_report(ReportType, Report, OnionKeyHash) -> gen_statem:cast(?MODULE, {send_report, ReportType, Report, OnionKeyHash, 5}). --spec send_report(witness | receipt, any(), binary(), non_neg_integer()) -> ok. +-spec send_report( + ReportType :: witness | receipt, + Report :: #blockchain_poc_receipt_v1_pb{} | #blockchain_poc_witness_v1_pb{}, + OnionKeyHash :: binary(), + Retries :: non_neg_integer()) -> ok. send_report(ReportType, Report, OnionKeyHash, Retries) -> gen_statem:cast(?MODULE, {send_report, ReportType, Report, OnionKeyHash, Retries}). --spec update_config([string()]) -> ok. +-spec update_config(UpdatedKeys::[string()]) -> ok. update_config(UpdatedKeys) -> gen_statem:cast(?MODULE, {update_config, UpdatedKeys}). --spec handle_streamed_msg(any()) -> ok. +-spec handle_streamed_msg(Msg::any()) -> ok. handle_streamed_msg(Msg) -> gen_statem:cast(?MODULE, {handle_streamed_msg, Msg}). --spec make_ets_table() -> [atom()]. +-spec make_ets_table() -> {ok,atom() | ets:tid()}. make_ets_table() -> Tab = ets:new( ?CHECK_TARGET_REQS, @@ -194,7 +205,9 @@ setup(info, find_validator, Data) -> val_grpc_port = ValPort, val_p2p_addr = ValP2P }, - [{next_event, info, connect_validator}]} + [{next_event, info, connect_validator}]}; + _ -> + {repeat_state, Data} end; setup( info, @@ -382,14 +395,14 @@ connected(_EventType, _Msg, Data) -> %% ------------------------------------------------------------------ %% Internal functions %% ------------------------------------------------------------------ --spec disconnect(data()) -> ok. +-spec disconnect(Data::data()) -> ok. disconnect(_Data = #data{connection = undefined}) -> ok; disconnect(_Data = #data{connection = Connection}) -> - catch _ = grpc_client_custom:stop_connection(Connection), + catch grpc_client_custom:stop_connection(Connection), ok. --spec find_validator() -> {error, any()} | {ok, string(), pos_integer(), string()}. +-spec find_validator() -> {ok, string(), pos_integer(), string()} | {error, any()} . find_validator() -> case application:get_env(miner, seed_validators) of {ok, SeedValidators} -> @@ -419,16 +432,19 @@ find_validator() -> } = uri_string:parse(binary_to_list(DurableValURI)), {ok, DurableValIP, DurableValGRPCPort, DurableValP2PAddr}; - {error, Reason} = _Error -> - lager:warning("request to validator failed: ~p", [_Error]), - {error, Reason} + {error, _} = Error -> + lager:warning("request to validator failed: ~p", [Error]), + {error, Error} end; _ -> lager:warning("failed to find seed validators", []), {error, find_validator_request_failed} end. --spec connect_validator(string(), string(), pos_integer()) -> +-spec connect_validator( + ValAddr::string(), + ValIP::string(), + ValPort::pos_integer()) -> {error, any()} | {ok, grpc_client_custom:connection()}. connect_validator(ValAddr, ValIP, ValPort) -> try @@ -456,8 +472,11 @@ connect_validator(ValAddr, ValIP, ValPort) -> {error, connect_validator_failed} end. --spec connect_stream_poc(grpc_client_custom:connection(), libp2p_crypto:pubkey_bin(), function()) -> - {error, any()} | {ok, pid()}. +-spec connect_stream_poc( + Connection::grpc_client_custom:connection(), + SelfPubKeyBin::libp2p_crypto:pubkey_bin(), + SelfSigFun::function()) -> + {error, any()} | {ok, pid()}. connect_stream_poc(Connection, SelfPubKeyBin, SelfSigFun) -> case miner_poc_grpc_client_handler:poc_stream(Connection, SelfPubKeyBin, SelfSigFun) of {error, _Reason} = Error -> @@ -468,8 +487,9 @@ connect_stream_poc(Connection, SelfPubKeyBin, SelfSigFun) -> Res end. --spec connect_stream_config_update(grpc_client_custom:connection()) -> - {error, any()} | {ok, pid()}. +-spec connect_stream_config_update( + Connection::grpc_client_custom:connection()) -> + {error, any()} | {ok, pid()}. connect_stream_config_update(Connection) -> case miner_poc_grpc_client_handler:config_update_stream(Connection) of {error, _Reason} = Error -> @@ -483,9 +503,9 @@ connect_stream_config_update(Connection) -> end. -spec connect_stream_region_params_update( - grpc_client_custom:connection(), - libp2p_crypto:pubkey_bin(), - function() + Connection::grpc_client_custom:connection(), + SelfPubKeyBin::libp2p_crypto:pubkey_bin(), + SelfSigFun::function() ) -> {error, any()} | {ok, pid()}. connect_stream_region_params_update(Connection, SelfPubKeyBin, SelfSigFun) -> case @@ -505,13 +525,13 @@ connect_stream_region_params_update(Connection, SelfPubKeyBin, SelfSigFun) -> end. -spec send_report( - witness | receipt, - any(), - binary(), - libp2p_crypto:pubkey_bin(), - function(), - grpc_client_custom:connection(), - non_neg_integer() + ReportType::witness | receipt, + Report::#blockchain_poc_receipt_v1_pb{} | #blockchain_poc_witness_v1_pb{}, + OnionKeyHash::binary(), + SelfPubKeyBin::libp2p_crypto:pubkey_bin(), + SigFun::function(), + Connection::grpc_client_custom:connection(), + RetryAttempts::non_neg_integer() ) -> ok. send_report(_ReportType, _Report, _OnionKeyHash, _SelfPubKeyBin, _SigFun, _Connection, 0) -> ok; @@ -541,7 +561,12 @@ send_report( do_send_report(Req, ReportType, Report, OnionKeyHash, Connection, RetryAttempts). -spec do_send_report( - binary(), witness | receipt, any(), binary(), grpc_client_custom:connection(), non_neg_integer() + Req :: #gateway_poc_report_req_v1_pb{}, + ReportType :: receipt | witness, + Report :: #blockchain_poc_receipt_v1_pb{} | #blockchain_poc_witness_v1_pb{}, + OnionKeyHash ::binary(), + Connection :: grpc_client_custom:connection(), + RetryAttempts :: non_neg_integer() ) -> ok. do_send_report(Req, ReportType, Report, OnionKeyHash, Connection, RetryAttempts) -> %% ask validator for public uri of the challenger of this POC @@ -559,7 +584,11 @@ do_send_report(Req, ReportType, Report, OnionKeyHash, Connection, RetryAttempts) end, ok. --spec fetch_config([string()], string(), pos_integer()) -> {error, any()} | ok. +-spec fetch_config( + UpdatedKeys::[string()], + ValIP::string(), + ValGRPCPort::pos_integer()) -> + {error, any()} | ok. fetch_config(UpdatedKeys, ValIP, ValGRPCPort) -> %% filter out keys we are not interested in %% and then ask our validator for current values @@ -588,8 +617,10 @@ fetch_config(UpdatedKeys, ValIP, ValGRPCPort) -> end end. --spec send_grpc_unary_req(grpc_client_custom:connection(), any(), atom()) -> - {error, any(), map()} | {error, any()} | {ok, any(), map()} | {ok, map()}. +-spec send_grpc_unary_req( + Connection::grpc_client_custom:connection(), + Req::tuple(), + RPC::atom()) -> unary_result(). send_grpc_unary_req(undefined, _Req, _RPC) -> {error, no_grpc_connection}; send_grpc_unary_req(Connection, Req, RPC) -> @@ -611,8 +642,11 @@ send_grpc_unary_req(Connection, Req, RPC) -> {error, req_failed} end. --spec send_grpc_unary_req(string(), non_neg_integer(), any(), atom()) -> - {error, any(), map()} | {error, any()} | {ok, any(), map()} | {ok, map()}. +-spec send_grpc_unary_req( + PeerIP :: string(), + GRPCPort :: non_neg_integer(), + Req :: tuple(), + RPC :: atom()) -> unary_result(). send_grpc_unary_req(PeerIP, GRPCPort, Req, RPC) -> try lager:debug("Send unary request via new connection to ip ~p: ~p", [PeerIP, Req]), @@ -628,7 +662,7 @@ send_grpc_unary_req(PeerIP, GRPCPort, Req, RPC) -> ), lager:debug("New Connection, send unary result: ~p", [Res]), %% we dont need the connection to hang around, so close it out - catch _ = grpc_client_custom:stop_connection(Connection), + _ = grpc_client_custom:stop_connection(Connection), process_unary_response(Res) catch _Class:_Error:_Stack -> @@ -642,22 +676,23 @@ build_validators_req(Quantity) -> quantity = Quantity }. --spec build_config_req([string()]) -> #gateway_config_req_v1_pb{}. +-spec build_config_req(Keys::[string()]) -> #gateway_config_req_v1_pb{}. build_config_req(Keys) -> #gateway_config_req_v1_pb{keys = Keys}. --spec build_poc_challenger_req(binary()) -> #gateway_poc_key_routing_data_req_v1_pb{}. +-spec build_poc_challenger_req(OnionKeyHash::binary()) + -> #gateway_poc_key_routing_data_req_v1_pb{}. build_poc_challenger_req(OnionKeyHash) -> #gateway_poc_key_routing_data_req_v1_pb{key = OnionKeyHash}. -spec build_check_target_req( - libp2p_crypto:pubkey_bin(), - binary(), - binary(), - non_neg_integer(), - binary(), - libp2p_crypto:pubkey_bin(), - function() + ChallengerPubKeyBin::libp2p_crypto:pubkey_bin(), + OnionKeyHash::binary(), + BlockHash::binary(), + ChallengeHeight::non_neg_integer(), + ChallengerSig::binary(), + SelfPubKeyBin::libp2p_crypto:pubkey_bin(), + SelfSigFun::function() ) -> #gateway_poc_check_challenge_target_req_v1_pb{}. build_check_target_req( ChallengerPubKeyBin, @@ -683,8 +718,7 @@ build_check_target_req( ), Req#gateway_poc_check_challenge_target_req_v1_pb{challengee_sig = SelfSigFun(ReqEncoded)}. -%% TODO: return a better and consistent response -%%-spec process_unary_response(grpc_client_custom:unary_response()) -> {error, any(), map()} | {error, any()} | {ok, any(), map()} | {ok, map()}. +-spec process_unary_response(grpc_client_custom:unary_response()) -> unary_result(). process_unary_response( {ok, #{ http_status := 200, @@ -709,100 +743,13 @@ process_unary_response( }} ) -> {ok, Payload, #{height => Height, signature => Sig}}; -process_unary_response({error, ClientError = #{error_type := 'client'}}) -> - lager:warning("grpc error response ~p", [ClientError]), - {error, grpc_client_error}; -process_unary_response( - {error, ClientError = #{error_type := 'grpc', http_status := 200, status_message := ErrorMsg}} -) -> - lager:warning("grpc error response ~p", [ClientError]), - {error, ErrorMsg}; -process_unary_response(_Response) -> - lager:warning("unhandled grpc response ~p", [_Response]), - {error, unexpected_response}. - -handle_down_event( - _CurState, - {'DOWN', Ref, process, _, Reason}, - Data = #data{conn_monitor_ref = Ref, connection = Connection} -) -> - lager:warning("GRPC connection to validator is down, reconnecting. Reason: ~p", [Reason]), - _ = grpc_client_custom:stop_connection(Connection), - %% if the connection goes down, enter setup state to reconnect - {next_state, setup, Data}; -handle_down_event( - _CurState, - {'DOWN', Ref, process, _, Reason} = Event, - Data = #data{ - stream_poc_monitor_ref = Ref, - connection = Connection, - self_pub_key_bin = SelfPubKeyBin, - self_sig_fun = SelfSigFun - } -) -> - %% the poc stream is meant to be long lived, we always want it up as long as we have a grpc connection - %% so if it goes down start it back up again - lager:warning("poc stream to validator is down, reconnecting. Reason: ~p", [Reason]), - case connect_stream_poc(Connection, SelfPubKeyBin, SelfSigFun) of - {ok, StreamPid} -> - M = erlang:monitor(process, StreamPid), - {keep_state, Data#data{stream_poc_monitor_ref = M, stream_poc_pid = StreamPid}}; - {error, _} -> - %% if stream reconnnect fails, replay the orig down msg to trigger another attempt - %% NOTE: not using transition actions below as want a delay before the msgs get processed again - erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), - {keep_state, Data} - end; -handle_down_event( - _CurState, - {'DOWN', Ref, process, _, Reason} = Event, - Data = #data{ - stream_config_update_monitor_ref = Ref, - connection = Connection - } -) -> - %% the config_update stream is meant to be long lived, we always want it up as long as we have a grpc connection - %% so if it goes down start it back up again - lager:warning("config_update stream to validator is down, reconnecting. Reason: ~p", [Reason]), - case connect_stream_config_update(Connection) of - {ok, StreamPid} -> - M = erlang:monitor(process, StreamPid), - {keep_state, Data#data{ - stream_config_update_monitor_ref = M, stream_config_update_pid = StreamPid - }}; - {error, _} -> - %% if stream reconnnect fails, replay the orig down msg to trigger another attempt - %% NOTE: not using transition actions below as want a delay before the msgs get processed again - erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), - {keep_state, Data} - end; -handle_down_event( - _CurState, - {'DOWN', Ref, process, _, Reason} = Event, - Data = #data{ - stream_region_params_update_monitor_ref = Ref, - connection = Connection, - self_pub_key_bin = SelfPubKeyBin, - self_sig_fun = SelfSigFun - } -) -> - %% the region_params stream is meant to be long lived, we always want it up as long as we have a grpc connection - %% so if it goes down start it back up again - lager:warning("region_params_update stream to validator is down, reconnecting. Reason: ~p", [Reason]), - case connect_stream_region_params_update(Connection, SelfPubKeyBin, SelfSigFun) of - {ok, StreamPid} -> - M = erlang:monitor(process, StreamPid), - {keep_state, Data#data{ - stream_region_params_update_monitor_ref = M, stream_region_params_update_pid = StreamPid - }}; - {error, _} -> - %% if stream reconnnect fails, replay the orig down msg to trigger another attempt - %% NOTE: not using transition actions below as want a delay before the msgs get processed again - erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), - {keep_state, Data} - end. +process_unary_response({error, _}) -> + lager:warning("grpc client error response", []), + {error, grpc_request_failed}. --spec get_uri_for_challenger(binary(), grpc_client_custom:connection()) -> +-spec get_uri_for_challenger( + OnionKeyHash::binary(), + Connection::grpc_client_custom:connection()) -> {ok, {string(), pos_integer()}} | {error, any()}. get_uri_for_challenger(OnionKeyHash, Connection) -> Req = build_poc_challenger_req(OnionKeyHash), @@ -860,8 +807,8 @@ process_check_target_reqs() -> ). -spec cache_check_target_req( - binary(), - { + ID::binary(), + ReqData::{ string(), libp2p_crypto:pubkey_bin(), binary(), @@ -878,67 +825,19 @@ cache_check_target_req(ID, ReqData) -> cached_check_target_reqs() -> ets:tab2list(?CHECK_TARGET_REQS). --spec delete_cached_check_target_req(binary()) -> ok. +-spec delete_cached_check_target_req(Key::binary()) -> ok. delete_cached_check_target_req(Key) -> true = ets:delete(?CHECK_TARGET_REQS, Key), ok. -do_handle_streamed_msg( - #gateway_resp_v1_pb{ - msg = {poc_challenge_resp, ChallengeNotification}, - height = NotificationHeight, - signature = ChallengerSig - } = Msg, - State -) -> - lager:debug("grpc client received gateway_poc_challenge_notification_resp_v1 msg ~p", [Msg]), - #gateway_poc_challenge_notification_resp_v1_pb{ - challenger = #routing_address_pb{uri = URI, pub_key = PubKeyBin}, - block_hash = BlockHash, - onion_key_hash = OnionKeyHash - } = ChallengeNotification, - _ = check_if_target( - URI, PubKeyBin, OnionKeyHash, BlockHash, NotificationHeight, ChallengerSig, false - ), - State; -do_handle_streamed_msg( - #gateway_resp_v1_pb{ - msg = {config_update_streamed_resp, Payload}, - height = _NotificationHeight, - signature = _ChallengerSig - } = _Msg, - State -) -> - lager:debug("grpc client received config_update_streamed_resp msg ~p", [_Msg]), - #gateway_config_update_streamed_resp_v1_pb{keys = UpdatedKeys} = Payload, - _ = miner_poc_grpc_client_statem:update_config(UpdatedKeys), - State; -do_handle_streamed_msg( - #gateway_resp_v1_pb{ - msg = {region_params_streamed_resp, Payload}, - height = _NotificationHeight, - signature = _ChallengerSig - } = _Msg, - State -) -> - lager:debug("grpc client received region_params_streamed_resp msg ~p", [_Msg]), - #gateway_region_params_streamed_resp_v1_pb{region = Region, params = Params} = Payload, - #blockchain_region_params_v1_pb{region_params = RegionParams} = Params, - miner_lora_light:region_params_update(Region, RegionParams), - miner_onion_server_light:region_params_update(Region, RegionParams), - State; -do_handle_streamed_msg(_Msg, State) -> - lager:warning("grpc client received unexpected streamed msg ~p", [_Msg]), - State. - -spec check_if_target( - string(), - libp2p_crypto:pubkey_bin(), - binary(), - binary(), - pos_integer(), - function(), - boolean() + URI :: string(), + PubKeyBin :: binary(), + OnionKeyHash :: binary(), + BlockHash :: binary(), + NotificationHeight :: non_neg_integer(), + ChallengerSig :: binary(), + IsRetry :: boolean() ) -> ok. check_if_target( URI, PubKeyBin, OnionKeyHash, BlockHash, NotificationHeight, ChallengerSig, IsRetry @@ -959,7 +858,7 @@ check_if_target( %% we got an expected response, purge req from queued cache should it exist _ = delete_cached_check_target_req(OnionKeyHash), handle_check_target_resp(Result); - {error, <<"queued_poc">>, #{height := ValRespHeight} = _Details} -> + {error, <<"queued_poc">>, _Details = #{height := ValRespHeight}} -> %% seems the POC key exists but the POC itself may not yet be initialised %% this can happen if the challenging validator is behind our %% notifying validator @@ -987,19 +886,22 @@ check_if_target( _ = delete_cached_check_target_req(OnionKeyHash), ok; {error, _Reason} -> + %% we got an non queued response, purge req from queued cache should it exist + _ = delete_cached_check_target_req(OnionKeyHash), ok end end, - spawn(F). + spawn(F), + ok. -spec send_check_target_req( - string(), - libp2p_crypto:pubkey_bin(), - binary(), - binary(), - non_neg_integer(), - libp2p_crypto:signature() -) -> {error, any()} | {error, any(), map()} | {ok, any(), map()}. + ChallengerURI :: string(), + ChallengerPubKeyBin :: binary(), + OnionKeyHash :: binary(), + BlockHash :: binary(), + NotificationHeight :: non_neg_integer(), + ChallengerSig :: binary() +) -> unary_result(). send_check_target_req( ChallengerURI, ChallengerPubKeyBin, @@ -1012,7 +914,7 @@ send_check_target_req( {ok, _, SelfSigFun, _} = blockchain_swarm:keys(), %% split the URI into its IP and port parts #{host := IP, port := Port, scheme := _Scheme} = - uri_string:parse(binary_to_list(ChallengerURI)), + uri_string:parse(ChallengerURI), TargetIP = maybe_override_ip(IP), %% build the request Req = build_check_target_req( @@ -1036,6 +938,137 @@ handle_check_target_resp( ) -> ok. + + +do_handle_streamed_msg( + #gateway_resp_v1_pb{ + msg = {poc_challenge_resp, ChallengeNotification}, + height = NotificationHeight, + signature = ChallengerSig + } = Msg, + State +) -> + lager:debug("grpc client received gateway_poc_challenge_notification_resp_v1 msg ~p", [Msg]), + #gateway_poc_challenge_notification_resp_v1_pb{ + challenger = #routing_address_pb{uri = URI, pub_key = PubKeyBin}, + block_hash = BlockHash, + onion_key_hash = OnionKeyHash + } = ChallengeNotification, + _ = check_if_target( + binary_to_list(URI), PubKeyBin, OnionKeyHash, BlockHash, NotificationHeight, ChallengerSig, false + ), + State; +do_handle_streamed_msg( + #gateway_resp_v1_pb{ + msg = {config_update_streamed_resp, Payload}, + height = _NotificationHeight, + signature = _ChallengerSig + } = _Msg, + State +) -> + lager:debug("grpc client received config_update_streamed_resp msg ~p", [_Msg]), + #gateway_config_update_streamed_resp_v1_pb{keys = UpdatedKeys} = Payload, + _ = miner_poc_grpc_client_statem:update_config(UpdatedKeys), + State; +do_handle_streamed_msg( + #gateway_resp_v1_pb{ + msg = {region_params_streamed_resp, Payload}, + height = _NotificationHeight, + signature = _ChallengerSig + } = _Msg, + State +) -> + lager:debug("grpc client received region_params_streamed_resp msg ~p", [_Msg]), + #gateway_region_params_streamed_resp_v1_pb{region = Region, params = Params} = Payload, + #blockchain_region_params_v1_pb{region_params = RegionParams} = Params, + miner_lora_light:region_params_update(Region, RegionParams), + miner_onion_server_light:region_params_update(Region, RegionParams), + State; +do_handle_streamed_msg(_Msg, State) -> + lager:warning("grpc client received unexpected streamed msg ~p", [_Msg]), + State. + +handle_down_event( + _CurState, + {'DOWN', Ref, process, _, Reason}, + Data = #data{conn_monitor_ref = Ref, connection = Connection} +) -> + lager:warning("GRPC connection to validator is down, reconnecting. Reason: ~p", [Reason]), + _ = grpc_client_custom:stop_connection(Connection), + %% if the connection goes down, enter setup state to reconnect + {next_state, setup, Data}; +handle_down_event( + _CurState, + {'DOWN', Ref, process, _, Reason} = Event, + Data = #data{ + stream_poc_monitor_ref = Ref, + connection = Connection, + self_pub_key_bin = SelfPubKeyBin, + self_sig_fun = SelfSigFun + } +) -> + %% the poc stream is meant to be long lived, we always want it up as long as we have a grpc connection + %% so if it goes down start it back up again + lager:warning("poc stream to validator is down, reconnecting. Reason: ~p", [Reason]), + case connect_stream_poc(Connection, SelfPubKeyBin, SelfSigFun) of + {ok, StreamPid} -> + M = erlang:monitor(process, StreamPid), + {keep_state, Data#data{stream_poc_monitor_ref = M, stream_poc_pid = StreamPid}}; + {error, _} -> + %% if stream reconnnect fails, replay the orig down msg to trigger another attempt + %% NOTE: not using transition actions below as want a delay before the msgs get processed again + erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), + {keep_state, Data} + end; +handle_down_event( + _CurState, + {'DOWN', Ref, process, _, Reason} = Event, + Data = #data{ + stream_config_update_monitor_ref = Ref, + connection = Connection + } +) -> + %% the config_update stream is meant to be long lived, we always want it up as long as we have a grpc connection + %% so if it goes down start it back up again + lager:warning("config_update stream to validator is down, reconnecting. Reason: ~p", [Reason]), + case connect_stream_config_update(Connection) of + {ok, StreamPid} -> + M = erlang:monitor(process, StreamPid), + {keep_state, Data#data{ + stream_config_update_monitor_ref = M, stream_config_update_pid = StreamPid + }}; + {error, _} -> + %% if stream reconnnect fails, replay the orig down msg to trigger another attempt + %% NOTE: not using transition actions below as want a delay before the msgs get processed again + erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), + {keep_state, Data} + end; +handle_down_event( + _CurState, + {'DOWN', Ref, process, _, Reason} = Event, + Data = #data{ + stream_region_params_update_monitor_ref = Ref, + connection = Connection, + self_pub_key_bin = SelfPubKeyBin, + self_sig_fun = SelfSigFun + } +) -> + %% the region_params stream is meant to be long lived, we always want it up as long as we have a grpc connection + %% so if it goes down start it back up again + lager:warning("region_params_update stream to validator is down, reconnecting. Reason: ~p", [Reason]), + case connect_stream_region_params_update(Connection, SelfPubKeyBin, SelfSigFun) of + {ok, StreamPid} -> + M = erlang:monitor(process, StreamPid), + {keep_state, Data#data{ + stream_region_params_update_monitor_ref = M, stream_region_params_update_pid = StreamPid + }}; + {error, _} -> + %% if stream reconnnect fails, replay the orig down msg to trigger another attempt + %% NOTE: not using transition actions below as want a delay before the msgs get processed again + erlang:send_after(?STREAM_RECONNECT_DELAY, self(), Event), + {keep_state, Data} + end. + -ifdef(TEST). maybe_override_ip(_IP) -> "127.0.0.1". diff --git a/src/poc/miner_poc_mgr.erl b/src/poc/miner_poc_mgr.erl index f78539b13..ff84156ac 100644 --- a/src/poc/miner_poc_mgr.erl +++ b/src/poc/miner_poc_mgr.erl @@ -347,7 +347,6 @@ handle_info(init, #state{chain = undefined} = State) -> Chain -> ok = blockchain_event:add_handler(self()), Ledger = blockchain:ledger(Chain), - ok = miner_poc:add_stream_handler(blockchain_swarm:tid(), miner_poc_report_handler), SelfPubKeyBin = blockchain_swarm:pubkey_bin(), {noreply, State#state{ chain = Chain, diff --git a/test/miner_poc_grpc_SUITE.erl b/test/miner_poc_grpc_SUITE.erl index ae607ee82..3ba180228 100644 --- a/test/miner_poc_grpc_SUITE.erl +++ b/test/miner_poc_grpc_SUITE.erl @@ -598,8 +598,10 @@ extra_vars(grpc, TargetVersion) -> ?poc_receipts_absorb_timeout => 2, ?poc_validator_ephemeral_key_timeout => 50, ?poc_activity_filter_enabled => true, - ?h3dex_gc_width => 10, ?poc_targeting_version => TargetVersion, + ?h3dex_gc_width => 10, + ?poc_target_hex_parent_res => 5, + ?poc_target_hex_collection_res => 5, ?poc_target_pool_size => 2, ?poc_hexing_type => hex_h3dex, ?hip17_interactivity_blocks => 20