From efd205b4d23cd45f21fcd24231c5717a1c428271 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Tue, 16 May 2023 12:35:56 -0500 Subject: [PATCH 1/9] squash admin config --- core/src/proxy/block_engine_stage.rs | 137 +++++++---- core/src/proxy/relayer_stage.rs | 147 +++++++---- core/src/tpu.rs | 97 +++----- core/src/validator.rs | 14 +- local-cluster/src/validator_configs.rs | 4 +- validator/src/admin_rpc_service.rs | 83 ++++++- validator/src/bin/solana-test-validator.rs | 9 +- validator/src/main.rs | 269 ++++++++++++++------- 8 files changed, 506 insertions(+), 254 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 6b87f9a9fe..cf80d1a919 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -46,6 +46,7 @@ use { const CONNECTION_TIMEOUT_S: u64 = 10; const CONNECTION_BACKOFF_S: u64 = 5; +const CONFIG_BACKOFF_S: u64 = 30; #[derive(Default)] struct BlockEngineStageStats { @@ -72,13 +73,10 @@ pub struct BlockBuilderFeeInfo { pub block_builder_commission: u64, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct BlockEngineConfig { - /// Address to the external auth-service responsible for generating access tokens. - pub auth_service_endpoint: Endpoint, - - /// Primary backend endpoint. - pub backend_endpoint: Endpoint, + /// Block Engine URL + pub block_engine_url: String, /// If set then it will be assumed the backend verified packets so signature verification will be bypassed in the validator. pub trust_packets: bool, @@ -90,7 +88,7 @@ pub struct BlockEngineStage { impl BlockEngineStage { pub fn new( - block_engine_config: BlockEngineConfig, + block_engine_config: Arc>, // Channel that bundles get piped through. bundle_tx: Sender>, // The keypair stored here is used to sign auth challenges. @@ -137,7 +135,7 @@ impl BlockEngineStage { #[allow(clippy::too_many_arguments)] async fn start( - block_engine_config: BlockEngineConfig, + block_engine_config: Arc>, cluster_info: Arc, bundle_tx: Sender>, packet_tx: Sender, @@ -147,10 +145,15 @@ impl BlockEngineStage { ) { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(CONNECTION_TIMEOUT_S); const CONNECTION_BACKOFF: Duration = Duration::from_secs(CONNECTION_BACKOFF_S); + const CONFIG_BACKOFF: Duration = Duration::from_secs(CONFIG_BACKOFF_S); let mut error_count: u64 = 0; while !exit.load(Ordering::Relaxed) { - if let Err(e) = Self::connect_auth_and_stream( + // Wait until a valid config is supplied (either initially or by admin rpc) + // Use if!/else here to avoid extra CONFIG_BACKOFF wait on successful termination + if !Self::validate_block_engine_config(&block_engine_config.lock().unwrap()) { + sleep(CONFIG_BACKOFF).await; + } else if let Err(e) = Self::connect_auth_and_stream( &block_engine_config, &cluster_info, &bundle_tx, @@ -183,7 +186,7 @@ impl BlockEngineStage { } async fn connect_auth_and_stream( - block_engine_config: &BlockEngineConfig, + block_engine_config: &Arc>, cluster_info: &Arc, bundle_tx: &Sender>, packet_tx: &Sender, @@ -194,18 +197,48 @@ impl BlockEngineStage { ) -> crate::proxy::Result<()> { // Get a copy of configs here in case they have changed at runtime let keypair = cluster_info.keypair().clone(); + let local_config = &block_engine_config.lock().unwrap().clone(); + + let mut auth_service_endpoint = + Endpoint::from_shared(local_config.block_engine_url.clone()).map_err(|_| { + ProxyError::AuthenticationConnectionError( + "invalid block engine url value".parse().unwrap(), + ) + })?; + let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()) + .map_err(|_| { + ProxyError::BlockEngineConnectionError( + "invalid block engine url value".parse().unwrap(), + ) + })? + .tcp_keepalive(Some(Duration::from_secs(60))); + + if local_config.block_engine_url.contains("https") { + auth_service_endpoint = auth_service_endpoint + .tls_config(tonic::transport::ClientTlsConfig::new()) + .map_err(|_| { + ProxyError::AuthenticationConnectionError( + "failed to set tls_config for block engine auth service" + .parse() + .unwrap(), + ) + })?; + backend_endpoint = backend_endpoint + .tls_config(tonic::transport::ClientTlsConfig::new()) + .map_err(|_| { + ProxyError::BlockEngineConnectionError( + "failed to set tls_config for block engine service" + .parse() + .unwrap(), + ) + })?; + } - debug!( - "connecting to auth: {:?}", - block_engine_config.auth_service_endpoint.uri() - ); - let auth_channel = timeout( - *connection_timeout, - block_engine_config.auth_service_endpoint.connect(), - ) - .await - .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? - .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; + debug!("connecting to auth: {:?}", &local_config.block_engine_url); + let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) + .await + .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? + .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; let mut auth_client = AuthServiceClient::new(auth_channel); @@ -219,25 +252,18 @@ impl BlockEngineStage { datapoint_info!( "block_engine_stage-tokens_generated", - ( - "url", - block_engine_config.auth_service_endpoint.uri().to_string(), - String - ), + ("url", &local_config.block_engine_url, String), ("count", 1, i64), ); debug!( "connecting to block engine: {:?}", - block_engine_config.backend_endpoint.uri() + &local_config.block_engine_url ); - let block_engine_channel = timeout( - *connection_timeout, - block_engine_config.backend_endpoint.connect(), - ) - .await - .map_err(|_| ProxyError::BlockEngineConnectionTimeout)? - .map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?; + let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect()) + .await + .map_err(|_| ProxyError::BlockEngineConnectionTimeout)? + .map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?; let access_token = Arc::new(Mutex::new(access_token)); let block_engine_client = BlockEngineValidatorClient::with_interceptor( @@ -246,13 +272,14 @@ impl BlockEngineStage { ); Self::start_consuming_block_engine_bundles_and_packets( - &bundle_tx, + bundle_tx, block_engine_client, - &packet_tx, + packet_tx, + local_config, block_engine_config, - &verified_packet_tx, - &exit, - &block_builder_fee_info, + verified_packet_tx, + exit, + block_builder_fee_info, auth_client, access_token, refresh_token, @@ -263,11 +290,13 @@ impl BlockEngineStage { .await } + #[allow(clippy::too_many_arguments)] async fn start_consuming_block_engine_bundles_and_packets( bundle_tx: &Sender>, mut client: BlockEngineValidatorClient>, packet_tx: &Sender, - block_engine_config: &BlockEngineConfig, + local_config: &BlockEngineConfig, + global_config: &Arc>, verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, block_builder_fee_info: &Arc>, @@ -317,7 +346,8 @@ impl BlockEngineStage { (subscribe_bundles_stream, subscribe_packets_stream), bundle_tx, packet_tx, - block_engine_config, + local_config, + global_config, verified_packet_tx, exit, block_builder_fee_info, @@ -331,6 +361,7 @@ impl BlockEngineStage { .await } + #[allow(clippy::too_many_arguments)] async fn consume_bundle_and_packet_stream( mut client: BlockEngineValidatorClient>, (mut bundle_stream, mut packet_stream): ( @@ -339,7 +370,8 @@ impl BlockEngineStage { ), bundle_tx: &Sender>, packet_tx: &Sender, - block_engine_config: &BlockEngineConfig, + local_config: &BlockEngineConfig, + global_config: &Arc>, verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, block_builder_fee_info: &Arc>, @@ -357,9 +389,8 @@ impl BlockEngineStage { let mut num_full_refreshes: u64 = 1; let mut num_refresh_access_token: u64 = 0; let mut block_engine_stats = BlockEngineStageStats::default(); - let mut metrics_tick = interval(METRICS_TICK); + let mut metrics_and_auth_tick = interval(METRICS_TICK); let mut maintenance_tick = interval(MAINTENANCE_TICK); - let auth_uri_string = block_engine_config.auth_service_endpoint.uri().to_string(); info!("connected to packet and bundle stream"); @@ -367,12 +398,12 @@ impl BlockEngineStage { tokio::select! { maybe_msg = packet_stream.message() => { let resp = maybe_msg?.ok_or(ProxyError::GrpcStreamDisconnected)?; - Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, block_engine_config.trust_packets, &mut block_engine_stats)?; + Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, local_config.trust_packets, &mut block_engine_stats)?; } maybe_bundles = bundle_stream.message() => { Self::handle_block_engine_maybe_bundles(maybe_bundles, bundle_tx, &mut block_engine_stats)?; } - _ = metrics_tick.tick() => { + _ = metrics_and_auth_tick.tick() => { block_engine_stats.report(); block_engine_stats = BlockEngineStageStats::default(); @@ -380,10 +411,14 @@ impl BlockEngineStage { return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string())); } + if *global_config.lock().unwrap() != *local_config { + return Err(ProxyError::AuthenticationConnectionError("Block Engine Config Changed".to_string())); + } + let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client, &access_token, &refresh_token, - &cluster_info, + cluster_info, connection_timeout, refresh_within_s, ).await?; @@ -392,7 +427,7 @@ impl BlockEngineStage { num_refresh_access_token += 1; datapoint_info!( "block_engine_stage-refresh_access_token", - ("url", auth_uri_string, String), + ("url", &local_config.block_engine_url, String), ("count", num_refresh_access_token, i64), ); *access_token.lock().unwrap() = new_token; @@ -401,7 +436,7 @@ impl BlockEngineStage { num_full_refreshes += 1; datapoint_info!( "block_engine_stage-tokens_generated", - ("url", auth_uri_string, String), + ("url", &local_config.block_engine_url, String), ("count", num_full_refreshes, i64), ); refresh_token = new_token; @@ -500,4 +535,8 @@ impl BlockEngineStage { Ok(()) } + + fn validate_block_engine_config(config: &BlockEngineConfig) -> bool { + !config.block_engine_url.is_empty() + } } diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index cb31a2abee..dbc3d8252f 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -47,6 +47,7 @@ use { const CONNECTION_TIMEOUT_S: u64 = 10; const CONNECTION_BACKOFF_S: u64 = 5; +const CONFIG_BACKOFF_S: u64 = 30; #[derive(Default)] struct RelayerStageStats { @@ -66,13 +67,10 @@ impl RelayerStageStats { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct RelayerConfig { - /// Address to the external auth-service responsible for generating access tokens. - pub auth_service_endpoint: Endpoint, - - /// Primary backend endpoint. - pub backend_endpoint: Endpoint, + /// Relayer URL + pub relayer_url: String, /// Interval at which heartbeats are expected. pub expected_heartbeat_interval: Duration, @@ -90,7 +88,7 @@ pub struct RelayerStage { impl RelayerStage { pub fn new( - relayer_config: RelayerConfig, + relayer_config: Arc>, // The keypair stored here is used to sign auth challenges. cluster_info: Arc, // Channel that server-sent heartbeats are piped through. @@ -134,7 +132,7 @@ impl RelayerStage { #[allow(clippy::too_many_arguments)] async fn start( - relayer_config: RelayerConfig, + relayer_config: Arc>, cluster_info: Arc, heartbeat_tx: Sender, packet_tx: Sender, @@ -143,10 +141,16 @@ impl RelayerStage { ) { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(CONNECTION_TIMEOUT_S); const CONNECTION_BACKOFF: Duration = Duration::from_secs(CONNECTION_BACKOFF_S); + const CONFIG_BACKOFF: Duration = Duration::from_secs(CONFIG_BACKOFF_S); + let mut error_count: u64 = 0; while !exit.load(Ordering::Relaxed) { - if let Err(e) = Self::connect_auth_and_stream( + // Wait until a valid config is supplied (either initially or by admin rpc) + // Use if!/else here to avoid extra CONFIG_BACKOFF wait on successful termination + if !Self::validate_relayer_config(&relayer_config.lock().unwrap()) { + sleep(CONFIG_BACKOFF).await; + } else if let Err(e) = Self::connect_auth_and_stream( &relayer_config, &cluster_info, &heartbeat_tx, @@ -178,7 +182,7 @@ impl RelayerStage { } async fn connect_auth_and_stream( - relayer_config: &RelayerConfig, + relayer_config: &Arc>, cluster_info: &Arc, heartbeat_tx: &Sender, packet_tx: &Sender, @@ -188,18 +192,45 @@ impl RelayerStage { ) -> crate::proxy::Result<()> { // Get a copy of configs here in case they have changed at runtime let keypair = cluster_info.keypair().clone(); + let local_config = relayer_config.lock().unwrap().clone(); + + let mut auth_service_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) + .map_err(|_| { + ProxyError::AuthenticationConnectionError( + "invalid relayer url value".parse().unwrap(), + ) + })?; + let mut backend_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) + .map_err(|_| { + ProxyError::RelayerConnectionError("invalid relayer url value".parse().unwrap()) + })? + .tcp_keepalive(Some(Duration::from_secs(60))); + if local_config.relayer_url.contains("https") { + auth_service_endpoint = auth_service_endpoint + .tls_config(tonic::transport::ClientTlsConfig::new()) + .map_err(|_| { + ProxyError::AuthenticationConnectionError( + "failed to set tls_config for relayer auth service" + .parse() + .unwrap(), + ) + })?; + backend_endpoint = backend_endpoint + .tls_config(tonic::transport::ClientTlsConfig::new()) + .map_err(|_| { + ProxyError::RelayerConnectionError( + "failed to set tls_config for relayer service" + .parse() + .unwrap(), + ) + })?; + } - debug!( - "connecting to auth: {:?}", - relayer_config.auth_service_endpoint.uri() - ); - let auth_channel = timeout( - *connection_timeout, - relayer_config.auth_service_endpoint.connect(), - ) - .await - .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? - .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; + debug!("connecting to auth: {:?}", local_config.relayer_url); + let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) + .await + .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? + .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; let mut auth_client = AuthServiceClient::new(auth_channel); @@ -213,25 +244,15 @@ impl RelayerStage { datapoint_info!( "relayer_stage-tokens_generated", - ( - "url", - relayer_config.auth_service_endpoint.uri().to_string(), - String - ), + ("url", local_config.relayer_url, String), ("count", 1, i64), ); - debug!( - "connecting to relayer: {:?}", - relayer_config.backend_endpoint.uri() - ); - let relayer_channel = timeout( - *connection_timeout, - relayer_config.backend_endpoint.connect(), - ) - .await - .map_err(|_| ProxyError::RelayerConnectionTimeout)? - .map_err(|e| ProxyError::RelayerConnectionError(e.to_string()))?; + debug!("connecting to relayer: {:?}", local_config.relayer_url); + let relayer_channel = timeout(*connection_timeout, backend_endpoint.connect()) + .await + .map_err(|_| ProxyError::RelayerConnectionTimeout)? + .map_err(|e| ProxyError::RelayerConnectionError(e.to_string()))?; let access_token = Arc::new(Mutex::new(access_token)); let relayer_client = RelayerClient::with_interceptor( @@ -244,8 +265,9 @@ impl RelayerStage { heartbeat_tx, packet_tx, verified_packet_tx, + &local_config, relayer_config, - &exit, + exit, auth_client, access_token, refresh_token, @@ -256,12 +278,14 @@ impl RelayerStage { .await } + #[allow(clippy::too_many_arguments)] async fn start_consuming_relayer_packets( mut client: RelayerClient>, heartbeat_tx: &Sender, packet_tx: &Sender, verified_packet_tx: &Sender<(Vec, Option)>, - relayer_config: &RelayerConfig, + local_config: &RelayerConfig, + global_config: &Arc>, exit: &Arc, auth_client: AuthServiceClient, access_token: Arc>, @@ -309,7 +333,8 @@ impl RelayerStage { heartbeat_tx, packet_stream, packet_tx, - relayer_config, + local_config, + global_config, verified_packet_tx, exit, auth_client, @@ -322,12 +347,14 @@ impl RelayerStage { .await } + #[allow(clippy::too_many_arguments)] async fn consume_packet_stream( heartbeat_event: HeartbeatEvent, heartbeat_tx: &Sender, mut packet_stream: Streaming, packet_tx: &Sender, - relayer_config: &RelayerConfig, + local_config: &RelayerConfig, + global_config: &Arc>, verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, mut auth_client: AuthServiceClient, @@ -341,30 +368,28 @@ impl RelayerStage { let refresh_within_s: u64 = METRICS_TICK.as_secs().saturating_mul(3).saturating_div(2); let mut relayer_stats = RelayerStageStats::default(); - let mut metrics_tick = interval(METRICS_TICK); + let mut metrics_and_auth_tick = interval(METRICS_TICK); let mut num_full_refreshes: u64 = 1; let mut num_refresh_access_token: u64 = 0; - let mut heartbeat_check_interval = interval(relayer_config.expected_heartbeat_interval); + let mut heartbeat_check_interval = interval(local_config.expected_heartbeat_interval); let mut last_heartbeat_ts = Instant::now(); - let auth_uri_string = relayer_config.auth_service_endpoint.uri().to_string(); - info!("connected to packet stream"); while !exit.load(Ordering::Relaxed) { tokio::select! { maybe_msg = packet_stream.message() => { let resp = maybe_msg?.ok_or(ProxyError::GrpcStreamDisconnected)?; - Self::handle_relayer_packets(resp, heartbeat_event, heartbeat_tx, &mut last_heartbeat_ts, packet_tx, relayer_config.trust_packets, verified_packet_tx, &mut relayer_stats)?; + Self::handle_relayer_packets(resp, heartbeat_event, heartbeat_tx, &mut last_heartbeat_ts, packet_tx, local_config.trust_packets, verified_packet_tx, &mut relayer_stats)?; } _ = heartbeat_check_interval.tick() => { - if last_heartbeat_ts.elapsed() > relayer_config.oldest_allowed_heartbeat { + if last_heartbeat_ts.elapsed() > local_config.oldest_allowed_heartbeat { return Err(ProxyError::HeartbeatExpired); } } - _ = metrics_tick.tick() => { + _ = metrics_and_auth_tick.tick() => { relayer_stats.report(); relayer_stats = RelayerStageStats::default(); @@ -372,10 +397,14 @@ impl RelayerStage { return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string())); } + if *global_config.lock().unwrap() != *local_config { + return Err(ProxyError::AuthenticationConnectionError("Relayer Config Changed".to_string())); + } + let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client, &access_token, &refresh_token, - &cluster_info, + cluster_info, connection_timeout, refresh_within_s, ).await?; @@ -384,7 +413,7 @@ impl RelayerStage { num_refresh_access_token += 1; datapoint_info!( "relayer_stage-refresh_access_token", - ("url", auth_uri_string, String), + ("url", &local_config.relayer_url, String), ("count", num_refresh_access_token, i64), ); *access_token.lock().unwrap() = new_token; @@ -393,7 +422,7 @@ impl RelayerStage { num_full_refreshes += 1; datapoint_info!( "relayer_stage-tokens_generated", - ("url", auth_uri_string, String), + ("url", &local_config.relayer_url, String), ("count", num_full_refreshes, i64), ); refresh_token = new_token; @@ -455,4 +484,20 @@ impl RelayerStage { } Ok(()) } + + fn validate_relayer_config(config: &RelayerConfig) -> bool { + if config.relayer_url.is_empty() { + warn!("Can't connect to relayer. Missing or invalid url."); + return false; + } + if config.oldest_allowed_heartbeat.is_zero() { + warn!("Relayer oldest allowed heartbeat must be greater than 0."); + return false; + } + if config.expected_heartbeat_interval.is_zero() { + warn!("Relayer expected heartbeat interval must be greater than 0."); + return false; + } + true + } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0cef388a9c..d112125613 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -71,9 +71,9 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, vote_sigverify_stage: SigVerifyStage, - maybe_relayer_stage: Option, - maybe_block_engine_stage: Option, - maybe_fetch_stage_manager: Option, + relayer_stage: RelayerStage, + block_engine_stage: BlockEngineStage, + fetch_stage_manager: FetchStageManager, banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, @@ -113,8 +113,8 @@ impl Tpu { keypair: &Keypair, log_messages_bytes_limit: Option, staked_nodes: &Arc>, - maybe_block_engine_config: Option, - maybe_relayer_config: Option, + block_engine_config: Arc>, + relayer_config: Arc>, tip_manager_config: TipManagerConfig, shred_receiver_address: Option, shared_staked_nodes_overrides: Arc>>, @@ -130,17 +130,11 @@ impl Tpu { transactions_forwards_quic: transactions_forwards_quic_sockets, } = sockets; + // Packets from fetch stage and quic server are intercepted and sent through fetch_stage_manager + // If relayer is connected, packets are dropped. If not, packets are forwarded on to packet_sender let (packet_intercept_sender, packet_intercept_receiver) = unbounded(); let (packet_sender, packet_receiver) = unbounded(); - // If there's a relayer, we need to redirect packets to the interceptor - // If not, they can flow straight through - let packet_send_channel = if maybe_relayer_config.is_some() { - packet_intercept_sender - } else { - packet_sender.clone() - }; - let (vote_packet_sender, vote_packet_receiver) = unbounded(); let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded(); let fetch_stage = FetchStage::new_with_sender( @@ -148,7 +142,7 @@ impl Tpu { tpu_forwards_sockets, tpu_vote_sockets, exit, - &packet_send_channel, + &packet_intercept_sender, &vote_packet_sender, &forwarded_packet_sender, forwarded_packet_receiver, @@ -192,7 +186,7 @@ impl Tpu { transactions_quic_sockets, keypair, cluster_info.my_contact_info().tpu.ip(), - packet_send_channel, + packet_intercept_sender, exit.clone(), MAX_QUIC_CONNECTIONS_PER_PEER, staked_nodes.clone(), @@ -239,38 +233,33 @@ impl Tpu { })); let (bundle_sender, bundle_receiver) = unbounded(); - let maybe_block_engine_stage = maybe_block_engine_config.map(|block_engine_config| { - BlockEngineStage::new( - block_engine_config, - bundle_sender, - cluster_info.clone(), - packet_sender.clone(), - verified_sender.clone(), - exit.clone(), - &block_builder_fee_info, - ) - }); + let block_engine_stage = BlockEngineStage::new( + block_engine_config, + bundle_sender, + cluster_info.clone(), + packet_sender.clone(), + verified_sender.clone(), + exit.clone(), + &block_builder_fee_info, + ); let (heartbeat_tx, heartbeat_rx) = unbounded(); - let maybe_fetch_stage_manager = maybe_relayer_config.as_ref().map(|_| { - FetchStageManager::new( - cluster_info.clone(), - heartbeat_rx, - packet_intercept_receiver, - packet_sender.clone(), - exit.clone(), - ) - }); - let maybe_relayer_stage = maybe_relayer_config.map(|relayer_config| { - RelayerStage::new( - relayer_config, - cluster_info.clone(), - heartbeat_tx, - packet_sender, - verified_sender, - exit.clone(), - ) - }); + let fetch_stage_manager = FetchStageManager::new( + cluster_info.clone(), + heartbeat_rx, + packet_intercept_receiver, + packet_sender.clone(), + exit.clone(), + ); + + let relayer_stage = RelayerStage::new( + relayer_config, + cluster_info.clone(), + heartbeat_tx, + packet_sender, + verified_sender, + exit.clone(), + ); let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) = unbounded(); @@ -345,9 +334,9 @@ impl Tpu { fetch_stage, sigverify_stage, vote_sigverify_stage, - maybe_block_engine_stage, - maybe_relayer_stage, - maybe_fetch_stage_manager, + block_engine_stage, + relayer_stage, + fetch_stage_manager, banking_stage, cluster_info_vote_listener, broadcast_stage, @@ -375,15 +364,9 @@ impl Tpu { self.bundle_stage.join(), ]; - if let Some(relayer_stage) = self.maybe_relayer_stage { - relayer_stage.join()?; - } - if let Some(block_engine_stage) = self.maybe_block_engine_stage { - block_engine_stage.join()?; - } - if let Some(fetch_stage_manager) = self.maybe_fetch_stage_manager { - fetch_stage_manager.join()?; - } + self.relayer_stage.join()?; + self.block_engine_stage.join()?; + self.fetch_stage_manager.join()?; let broadcast_result = self.broadcast_stage.join(); for result in results { diff --git a/core/src/validator.rs b/core/src/validator.rs index 2730c7577a..5f5e6fca21 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -109,7 +109,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -180,8 +180,8 @@ pub struct ValidatorConfig { pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, pub replay_slots_concurrently: bool, - pub maybe_relayer_config: Option, - pub maybe_block_engine_config: Option, + pub relayer_config: Arc>, + pub block_engine_config: Arc>, pub shred_receiver_address: Option, pub tip_manager_config: TipManagerConfig, pub preallocated_bundle_cost: u64, @@ -250,8 +250,8 @@ impl Default for ValidatorConfig { ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), replay_slots_concurrently: false, - maybe_relayer_config: None, - maybe_block_engine_config: None, + relayer_config: Arc::new(Mutex::new(RelayerConfig::default())), + block_engine_config: Arc::new(Mutex::new(BlockEngineConfig::default())), shred_receiver_address: None, tip_manager_config: TipManagerConfig::default(), preallocated_bundle_cost: u64::default(), @@ -1041,8 +1041,8 @@ impl Validator { &identity_keypair, config.runtime_config.log_messages_bytes_limit, &staked_nodes, - config.maybe_block_engine_config.clone(), - config.maybe_relayer_config.clone(), + config.block_engine_config.clone(), + config.relayer_config.clone(), config.tip_manager_config.clone(), config.shred_receiver_address, config.staked_nodes_overrides.clone(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 780089d291..a75c45e41f 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -67,8 +67,8 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), replay_slots_concurrently: config.replay_slots_concurrently, - maybe_relayer_config: config.maybe_relayer_config.clone(), - maybe_block_engine_config: config.maybe_block_engine_config.clone(), + relayer_config: config.relayer_config.clone(), + block_engine_config: config.block_engine_config.clone(), shred_receiver_address: config.shred_receiver_address, tip_manager_config: config.tip_manager_config.clone(), preallocated_bundle_cost: config.preallocated_bundle_cost, diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 75a3971c7c..cbcb1a6f89 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -7,7 +7,10 @@ use { log::*, serde::{de::Deserializer, Deserialize, Serialize}, solana_core::{ - consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress, + consensus::Tower, + proxy::{block_engine_stage::BlockEngineConfig, relayer_stage::RelayerConfig}, + tower_storage::TowerStorage, + validator::ValidatorStartProgress, }, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_runtime::bank_forks::BankForks, @@ -22,7 +25,7 @@ use { fmt::{self, Display}, net::SocketAddr, path::{Path, PathBuf}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder}, time::{Duration, SystemTime}, }, @@ -33,6 +36,8 @@ pub struct AdminRpcRequestMetadataPostInit { pub cluster_info: Arc, pub bank_forks: Arc>, pub vote_account: Pubkey, + pub relayer_config: Arc>, + pub block_engine_config: Arc>, } #[derive(Clone)] @@ -183,6 +188,24 @@ pub trait AdminRpc { #[rpc(meta, name = "contactInfo")] fn contact_info(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "setBlockEngineConfig")] + fn set_block_engine_config( + &self, + meta: Self::Metadata, + block_engine_url: String, + trust_packets: bool, + ) -> Result<()>; + + #[rpc(meta, name = "setRelayerConfig")] + fn set_relayer_config( + &self, + meta: Self::Metadata, + relayer_url: String, + trust_packets: bool, + expected_heartbeat_interval_ms: u64, + max_failed_heartbeats: u64, + ) -> Result<()>; } pub struct AdminRpcImpl; @@ -267,6 +290,29 @@ impl AdminRpc for AdminRpcImpl { Ok(()) } + fn set_block_engine_config( + &self, + meta: Self::Metadata, + block_engine_url: String, + trust_packets: bool, + ) -> Result<()> { + debug!("set_block_engine_config request received"); + + if block_engine_url.contains("http") { + meta.with_post_init(|post_init| { + *post_init.block_engine_config.lock().unwrap() = BlockEngineConfig { + block_engine_url, + trust_packets, + }; + Ok(()) + }) + } else { + Err(jsonrpc_core::error::Error::invalid_params( + "block_engine_url must point to an http(s) connection.", + )) + } + } + fn set_identity( &self, meta: Self::Metadata, @@ -303,6 +349,39 @@ impl AdminRpc for AdminRpcImpl { AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower) } + fn set_relayer_config( + &self, + meta: Self::Metadata, + relayer_url: String, + trust_packets: bool, + expected_heartbeat_interval_ms: u64, + max_failed_heartbeats: u64, + ) -> Result<()> { + debug!("set_relayer_config request received"); + + if relayer_url.contains("http") { + meta.with_post_init(|post_init| { + let expected_heartbeat_interval = + Duration::from_millis(expected_heartbeat_interval_ms); + + let oldest_allowed_heartbeat = + Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); + + *post_init.relayer_config.lock().unwrap() = RelayerConfig { + relayer_url, + expected_heartbeat_interval, + oldest_allowed_heartbeat, + trust_packets, + }; + Ok(()) + }) + } else { + Err(jsonrpc_core::error::Error::invalid_params( + "relayer_url must point to an http(s) connection.", + )) + } + } + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> { let loaded_config = load_staked_nodes_overrides(&path) .map_err(|err| { diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 1b8a05ae7b..b224cb04f0 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -9,7 +9,10 @@ use { normalize_to_url_if_moniker, }, }, - solana_core::tower_storage::FileTowerStorage, + solana_core::{ + proxy::{block_engine_stage::BlockEngineConfig, relayer_stage::RelayerConfig}, + tower_storage::FileTowerStorage, + }, solana_faucet::faucet::{run_local_faucet_with_port, FAUCET_PORT}, solana_rpc::{ rpc::{JsonRpcConfig, RpcBigtableConfig}, @@ -39,7 +42,7 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, process::exit, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::{Duration, SystemTime, UNIX_EPOCH}, }, }; @@ -858,6 +861,8 @@ fn main() { bank_forks: test_validator.bank_forks(), cluster_info: test_validator.cluster_info(), vote_account: test_validator.vote_account_address(), + block_engine_config: Arc::new(Mutex::new(BlockEngineConfig::default())), + relayer_config: Arc::new(Mutex::new(RelayerConfig::default())), }); if let Some(dashboard) = dashboard { dashboard.run(Duration::from_millis(250)); diff --git a/validator/src/main.rs b/validator/src/main.rs index 52477a7e4f..49e00d39d6 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] + #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; use { @@ -91,10 +92,9 @@ use { path::{Path, PathBuf}, process::exit, str::FromStr, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::{Duration, SystemTime}, }, - tonic::transport::Endpoint, }; #[cfg(not(target_env = "msvc"))] @@ -115,6 +115,8 @@ const DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED: u64 = 10485760; const MAX_SNAPSHOT_DOWNLOAD_ABORT: u32 = 5; const MILLIS_PER_SECOND: u64 = 1000; const DEFAULT_PREALLOCATED_BUNDLE_COST: u64 = 3000000; +const DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS: &str = "500"; +const DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS: &str = "3"; fn monitor_validator(ledger_path: &Path) { let dashboard = Dashboard::new(ledger_path, None, None).unwrap_or_else(|err| { @@ -1806,12 +1808,14 @@ pub fn main() { .long("relayer-expected-heartbeat-interval-ms") .takes_value(true) .help("Interval at which the Relayer is expected to send heartbeat messages.") + .default_value(DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS) ) .arg( Arg::with_name("relayer_max_failed_heartbeats") .long("relayer-max-failed-heartbeats") .takes_value(true) .help("Maximum number of heartbeats the Relayer can miss before falling back to the normal TPU pipeline.") + .default_value(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS) ) .arg( Arg::with_name("trust_block_engine_packets") @@ -1970,6 +1974,23 @@ pub fn main() { SubCommand::with_name("run") .about("Run the validator") ) + .subcommand( + SubCommand::with_name("set-block-engine-config") + .about("Set configuration for connection to a block engine") + .arg( + Arg::with_name("block_engine_url") + .long("block-engine-url") + .help("Block engine url. Set to empty string to disable block engine connection.") + .takes_value(true) + .required(true) + ) + .arg( + Arg::with_name("trust_block_engine_packets") + .long("trust-block-engine-packets") + .takes_value(false) + .help("Skip signature verification on block engine packets. Not recommended unless the block engine is trusted.") + ) + ) .subcommand( SubCommand::with_name("set-identity") .about("Set the validator identity") @@ -2003,6 +2024,39 @@ pub fn main() { ) .after_help("Note: the new filter only applies to the currently running validator instance") ) + .subcommand( + SubCommand::with_name("set-relayer-config") + .about("Set configuration for connection to a relayer") + .arg( + Arg::with_name("relayer_url") + .long("relayer-url") + .help("Relayer url. Set to empty string to disable relayer connection.") + .takes_value(true) + .required(true) + ) + .arg( + Arg::with_name("trust_relayer_packets") + .long("trust-relayer-packets") + .takes_value(false) + .help("Skip signature verification on relayer packets. Not recommended unless the relayer is trusted.") + ) + .arg( + Arg::with_name("relayer_expected_heartbeat_interval_ms") + .long("relayer-expected-heartbeat-interval-ms") + .takes_value(true) + .help("Interval at which the Relayer is expected to send heartbeat messages.") + .required(false) + .default_value(DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS) + ) + .arg( + Arg::with_name("relayer_max_failed_heartbeats") + .long("relayer-max-failed-heartbeats") + .takes_value(true) + .help("Maximum number of heartbeats the Relayer can miss before falling back to the normal TPU pipeline.") + .required(false) + .default_value(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS) + ) + ) .subcommand( SubCommand::with_name("staked-nodes-overrides") .about("Overrides stakes of specific node identities.") @@ -2225,6 +2279,23 @@ pub fn main() { }); return; } + ("set-block-engine-config", Some(subcommand_matches)) => { + let block_engine_url = value_t_or_exit!(subcommand_matches, "block_engine_url", String); + let trust_packets = subcommand_matches.is_present("trust_block_engine_packets"); + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_block_engine_config(block_engine_url, trust_packets) + .await + }) + .unwrap_or_else(|err| { + println!("set block engine config failed: {}", err); + exit(1); + }); + return; + } ("set-identity", Some(subcommand_matches)) => { let require_tower = subcommand_matches.is_present("require_tower"); @@ -2288,6 +2359,37 @@ pub fn main() { }); return; } + ("set-relayer-config", Some(subcommand_matches)) => { + let relayer_url = value_t_or_exit!(subcommand_matches, "relayer_url", String); + let trust_packets = subcommand_matches.is_present("trust_relayer_packets"); + let expected_heartbeat_interval_ms: u64 = + value_of(subcommand_matches, "relayer_expected_heartbeat_interval_ms").unwrap_or( + DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS + .parse() + .unwrap(), + ); + let max_failed_heartbeats: u64 = + value_of(subcommand_matches, "relayer_max_failed_heartbeats") + .unwrap_or(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS.parse().unwrap()); + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_relayer_config( + relayer_url, + trust_packets, + expected_heartbeat_interval_ms, + max_failed_heartbeats, + ) + .await + }) + .unwrap_or_else(|err| { + println!("set relayer config failed: {}", err); + exit(1); + }); + return; + } ("wait-for-restart-window", Some(subcommand_matches)) => { let min_idle_time = value_t_or_exit!(subcommand_matches, "min_idle_time", usize); let identity = pubkey_of(subcommand_matches, "identity"); @@ -2666,91 +2768,88 @@ pub fn main() { let voting_disabled = matches.is_present("no_voting") || restricted_repair_only_mode; let tip_manager_config = tip_manager_config_from_matches(&matches, voting_disabled); - let is_block_engine_enabled = matches.is_present("block_engine_url") - || matches.is_present("block_engine_address") - || matches.is_present("block_engine_auth_service_address") - || matches.is_present("trust_block_engine_packets"); - let maybe_block_engine_config = is_block_engine_enabled.then(|| { - let addr: String = value_of(&matches, "block_engine_url").unwrap_or_else(|| { - value_of(&matches, "block_engine_auth_service_address") - .expect("missing block-engine-url") - }); - let mut auth_service_endpoint = - Endpoint::from_shared(addr.clone()).expect("invalid block-engine-url value"); - if addr.contains("https") { - auth_service_endpoint = auth_service_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .expect("failed to set tls_config"); - } - - let addr: String = value_of(&matches, "block_engine_url").unwrap_or_else(|| { - value_of(&matches, "block_engine_address").expect("missing block-engine-url") - }); - let mut backend_endpoint = Endpoint::from_shared(addr.clone()) - .expect("invalid block-engine-address value") - .tcp_keepalive(Some(Duration::from_secs(60))); - if addr.contains("https") { - backend_endpoint = backend_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .expect("failed to set tls_config"); - } - - BlockEngineConfig { - auth_service_endpoint, - backend_endpoint, - trust_packets: matches.is_present("trust_block_engine_packets"), - } - }); - - let is_relayer_enabled = matches.is_present("relayer_url") - || matches.is_present("relayer_auth_service_address") - || matches.is_present("relayer_address") - || matches.is_present("trust_relayer_packets") - || matches.is_present("relayer_expected_heartbeat_interval_ms") - || matches.is_present("relayer_max_failed_heartbeats"); - let maybe_relayer_config = is_relayer_enabled.then(|| { - let addr: String = value_of(&matches, "relayer_url").unwrap_or_else(|| { - value_of(&matches, "relayer_auth_service_address").expect("missing relayer-url") - }); - let mut auth_service_endpoint = - Endpoint::from_shared(addr.clone()).expect("invalid relayer-url value"); - if addr.contains("https") { - auth_service_endpoint = auth_service_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .expect("failed to set tls_config"); + let mut block_engine_config = BlockEngineConfig::default(); + if matches.is_present("block_engine_url") { + block_engine_config.block_engine_url = + value_of(&matches, "block_engine_url").expect("couldn't parse block_engine_url"); + } else { + let error_msg = "Specifying seperate auth and backend addresses for block engine is deprecated. Recommended to use --block_engine_url instead.\ + If using block_engine_auth_service_address and block_engine_address, they must both be provided and set to the same value."; + match ( + matches.is_present("block_engine_auth_service_address"), + matches.is_present("block_engine_address"), + ) { + (true, true) => { + let auth_addr: String = value_of(&matches, "block_engine_auth_service_address") + .expect("couldn't parse block_engine_auth_service_address"); + let backend_addr: String = value_of(&matches, "block_engine_address") + .expect("couldn't parse block_engine_address"); + if auth_addr != backend_addr { + eprintln!("{}", error_msg); + exit(1); + } else { + block_engine_config.block_engine_url = backend_addr; + } + } + (false, false) => {} + _ => { + eprintln!("{}", error_msg); + exit(1); + } } + } + block_engine_config.trust_packets = matches.is_present("trust_block_engine_packets"); - let addr: String = value_of(&matches, "relayer_url") - .unwrap_or_else(|| value_of(&matches, "relayer_address").expect("missing relayer-url")); - let mut backend_endpoint = - Endpoint::from_shared(addr.clone()).expect("invalid relayer-address value"); - if addr.contains("https") { - backend_endpoint = backend_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .expect("failed to set tls_config"); + let mut relayer_config = RelayerConfig::default(); + if matches.is_present("relayer_url") { + relayer_config.relayer_url = + value_of(&matches, "relayer_url").expect("couldn't parse relayer_url"); + } else { + let error_msg = "Specifying seperate auth and backend addresses for relayer is deprecated. Recommended to use --relayer_url instead.\ + If using relayer_auth_service_address and relayer_address, they must both be provided and set to the same value."; + match ( + matches.is_present("relayer_auth_service_address"), + matches.is_present("relayer_address"), + ) { + (true, true) => { + let auth_addr: String = value_of(&matches, "relayer_auth_service_address") + .expect("couldn't parse relayer_auth_service_address"); + let backend_addr: String = + value_of(&matches, "relayer_address").expect("couldn't parse relayer_address"); + if auth_addr != backend_addr { + eprintln!("{}", error_msg); + exit(1); + } else { + relayer_config.relayer_url = backend_addr; + } + } + (false, false) => {} + _ => { + eprintln!("{}", error_msg); + exit(1); + } } + } + relayer_config.trust_packets = matches.is_present("trust_relayer_packets"); - let expected_heartbeat_interval_ms = - value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap_or(500); - let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); - - let max_failed_heartbeats = - value_of(&matches, "relayer_max_failed_heartbeats").unwrap_or(3); - assert!( - max_failed_heartbeats > 0, - "relayer-max-failed-heartbeats must be greater than zero" + let expected_heartbeat_interval_ms: u64 = + value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap_or( + DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS + .parse() + .unwrap(), ); - let oldest_allowed_heartbeat = - Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); - - RelayerConfig { - auth_service_endpoint, - backend_endpoint, - expected_heartbeat_interval, - oldest_allowed_heartbeat, - trust_packets: matches.is_present("trust_relayer_packets"), - } - }); + let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); + relayer_config.expected_heartbeat_interval = expected_heartbeat_interval; + + let max_failed_heartbeats: u64 = value_of(&matches, "relayer_max_failed_heartbeats") + .unwrap_or(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS.parse().unwrap()); + assert!( + max_failed_heartbeats > 0, + "relayer-max-failed-heartbeats must be greater than zero" + ); + let oldest_allowed_heartbeat = + Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); + relayer_config.oldest_allowed_heartbeat = oldest_allowed_heartbeat; let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), @@ -2885,8 +2984,8 @@ pub fn main() { log_messages_bytes_limit: value_of(&matches, "log_messages_bytes_limit"), ..RuntimeConfig::default() }, - maybe_relayer_config, - maybe_block_engine_config, + relayer_config: Arc::new(Mutex::new(relayer_config)), + block_engine_config: Arc::new(Mutex::new(block_engine_config)), tip_manager_config, shred_receiver_address: matches .value_of("shred_receiver_address") @@ -3339,6 +3438,8 @@ pub fn main() { bank_forks: validator.bank_forks.clone(), cluster_info: validator.cluster_info.clone(), vote_account, + relayer_config: validator_config.relayer_config, + block_engine_config: validator_config.block_engine_config, }); if let Some(filename) = init_complete_file { From c764f10328009b56375564c934ce828a5a7e9257 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Tue, 16 May 2023 13:33:21 -0500 Subject: [PATCH 2/9] fix emptpy input --- validator/src/admin_rpc_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index cbcb1a6f89..3c451136aa 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -298,7 +298,7 @@ impl AdminRpc for AdminRpcImpl { ) -> Result<()> { debug!("set_block_engine_config request received"); - if block_engine_url.contains("http") { + if block_engine_url.contains("http") || block_engine_url.is_empty() { meta.with_post_init(|post_init| { *post_init.block_engine_config.lock().unwrap() = BlockEngineConfig { block_engine_url, @@ -359,7 +359,7 @@ impl AdminRpc for AdminRpcImpl { ) -> Result<()> { debug!("set_relayer_config request received"); - if relayer_url.contains("http") { + if relayer_url.contains("http") || relayer_url.is_empty() { meta.with_post_init(|post_init| { let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); From 53eb2e1bf167fa52a66af9b67e46c537ea25483b Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Wed, 17 May 2023 09:56:09 -0500 Subject: [PATCH 3/9] feedback non breaking --- core/src/proxy/block_engine_stage.rs | 34 ++++++++++++-------------- core/src/proxy/relayer_stage.rs | 36 +++++++++++++--------------- validator/src/main.rs | 33 ++++++++++--------------- 3 files changed, 45 insertions(+), 58 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index cf80d1a919..5c640ea53f 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -46,7 +46,6 @@ use { const CONNECTION_TIMEOUT_S: u64 = 10; const CONNECTION_BACKOFF_S: u64 = 5; -const CONFIG_BACKOFF_S: u64 = 30; #[derive(Default)] struct BlockEngineStageStats { @@ -103,7 +102,7 @@ impl BlockEngineStage { let block_builder_fee_info = block_builder_fee_info.clone(); let thread = Builder::new() - .name("block-engine-stage".into()) + .name("block-engine-stage".to_string()) .spawn(move || { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -145,14 +144,13 @@ impl BlockEngineStage { ) { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(CONNECTION_TIMEOUT_S); const CONNECTION_BACKOFF: Duration = Duration::from_secs(CONNECTION_BACKOFF_S); - const CONFIG_BACKOFF: Duration = Duration::from_secs(CONFIG_BACKOFF_S); let mut error_count: u64 = 0; while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) - // Use if!/else here to avoid extra CONFIG_BACKOFF wait on successful termination + // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination if !Self::validate_block_engine_config(&block_engine_config.lock().unwrap()) { - sleep(CONFIG_BACKOFF).await; + sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( &block_engine_config, &cluster_info, @@ -201,15 +199,17 @@ impl BlockEngineStage { let mut auth_service_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()).map_err(|_| { - ProxyError::AuthenticationConnectionError( - "invalid block engine url value".parse().unwrap(), - ) + ProxyError::AuthenticationConnectionError(format!( + "invalid block engine url value: {}", + local_config.block_engine_url + )) })?; let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()) .map_err(|_| { - ProxyError::BlockEngineConnectionError( - "invalid block engine url value".parse().unwrap(), - ) + ProxyError::BlockEngineConnectionError(format!( + "invalid block engine url value: {}", + local_config.block_engine_url + )) })? .tcp_keepalive(Some(Duration::from_secs(60))); @@ -218,18 +218,14 @@ impl BlockEngineStage { .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { ProxyError::AuthenticationConnectionError( - "failed to set tls_config for block engine auth service" - .parse() - .unwrap(), + "failed to set tls_config for block engine auth service".to_string(), ) })?; backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { ProxyError::BlockEngineConnectionError( - "failed to set tls_config for block engine service" - .parse() - .unwrap(), + "failed to set tls_config for block engine service".to_string(), ) })?; } @@ -408,11 +404,11 @@ impl BlockEngineStage { block_engine_stats = BlockEngineStageStats::default(); if cluster_info.id() != keypair.pubkey() { - return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string())); + return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } if *global_config.lock().unwrap() != *local_config { - return Err(ProxyError::AuthenticationConnectionError("Block Engine Config Changed".to_string())); + return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string())); } let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client, diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index dbc3d8252f..80d680536d 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -47,7 +47,6 @@ use { const CONNECTION_TIMEOUT_S: u64 = 10; const CONNECTION_BACKOFF_S: u64 = 5; -const CONFIG_BACKOFF_S: u64 = 30; #[derive(Default)] struct RelayerStageStats { @@ -100,7 +99,7 @@ impl RelayerStage { exit: Arc, ) -> Self { let thread = Builder::new() - .name("relayer-stage".into()) + .name("relayer-stage".to_string()) .spawn(move || { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -141,15 +140,14 @@ impl RelayerStage { ) { const CONNECTION_TIMEOUT: Duration = Duration::from_secs(CONNECTION_TIMEOUT_S); const CONNECTION_BACKOFF: Duration = Duration::from_secs(CONNECTION_BACKOFF_S); - const CONFIG_BACKOFF: Duration = Duration::from_secs(CONFIG_BACKOFF_S); let mut error_count: u64 = 0; while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) - // Use if!/else here to avoid extra CONFIG_BACKOFF wait on successful termination + // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination if !Self::validate_relayer_config(&relayer_config.lock().unwrap()) { - sleep(CONFIG_BACKOFF).await; + sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( &relayer_config, &cluster_info, @@ -196,13 +194,17 @@ impl RelayerStage { let mut auth_service_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) .map_err(|_| { - ProxyError::AuthenticationConnectionError( - "invalid relayer url value".parse().unwrap(), - ) + ProxyError::AuthenticationConnectionError(format!( + "invalid relayer url value: {}", + local_config.relayer_url + )) })?; let mut backend_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) .map_err(|_| { - ProxyError::RelayerConnectionError("invalid relayer url value".parse().unwrap()) + ProxyError::RelayerConnectionError(format!( + "invalid relayer url value: {}", + local_config.relayer_url + )) })? .tcp_keepalive(Some(Duration::from_secs(60))); if local_config.relayer_url.contains("https") { @@ -210,18 +212,14 @@ impl RelayerStage { .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { ProxyError::AuthenticationConnectionError( - "failed to set tls_config for relayer auth service" - .parse() - .unwrap(), + "failed to set tls_config for relayer auth service".to_string(), ) })?; backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { ProxyError::RelayerConnectionError( - "failed to set tls_config for relayer service" - .parse() - .unwrap(), + "failed to set tls_config for relayer service".to_string(), ) })?; } @@ -306,10 +304,10 @@ impl RelayerStage { let tpu_addr = tpu_config .tpu - .ok_or_else(|| ProxyError::MissingTpuSocket("tpu".into()))?; + .ok_or_else(|| ProxyError::MissingTpuSocket("tpu".to_string()))?; let tpu_forward_addr = tpu_config .tpu_forward - .ok_or_else(|| ProxyError::MissingTpuSocket("tpu_fwd".into()))?; + .ok_or_else(|| ProxyError::MissingTpuSocket("tpu_fwd".to_string()))?; let tpu_ip = IpAddr::from(tpu_addr.ip.parse::()?); let tpu_forward_ip = IpAddr::from(tpu_forward_addr.ip.parse::()?); @@ -394,11 +392,11 @@ impl RelayerStage { relayer_stats = RelayerStageStats::default(); if cluster_info.id() != keypair.pubkey() { - return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string())); + return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } if *global_config.lock().unwrap() != *local_config { - return Err(ProxyError::AuthenticationConnectionError("Relayer Config Changed".to_string())); + return Err(ProxyError::AuthenticationConnectionError("relayer config changed".to_string())); } let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client, diff --git a/validator/src/main.rs b/validator/src/main.rs index 49e00d39d6..7583f97cb5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2363,14 +2363,9 @@ pub fn main() { let relayer_url = value_t_or_exit!(subcommand_matches, "relayer_url", String); let trust_packets = subcommand_matches.is_present("trust_relayer_packets"); let expected_heartbeat_interval_ms: u64 = - value_of(subcommand_matches, "relayer_expected_heartbeat_interval_ms").unwrap_or( - DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS - .parse() - .unwrap(), - ); + value_of(subcommand_matches, "relayer_expected_heartbeat_interval_ms").unwrap(); let max_failed_heartbeats: u64 = - value_of(subcommand_matches, "relayer_max_failed_heartbeats") - .unwrap_or(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS.parse().unwrap()); + value_of(subcommand_matches, "relayer_max_failed_heartbeats").unwrap(); let admin_client = admin_rpc_service::connect(&ledger_path); admin_rpc_service::runtime() .block_on(async move { @@ -2768,7 +2763,10 @@ pub fn main() { let voting_disabled = matches.is_present("no_voting") || restricted_repair_only_mode; let tip_manager_config = tip_manager_config_from_matches(&matches, voting_disabled); - let mut block_engine_config = BlockEngineConfig::default(); + let mut block_engine_config = BlockEngineConfig { + trust_packets: matches.is_present("trust_block_engine_packets"), + ..Default::default() + }; if matches.is_present("block_engine_url") { block_engine_config.block_engine_url = value_of(&matches, "block_engine_url").expect("couldn't parse block_engine_url"); @@ -2798,9 +2796,11 @@ pub fn main() { } } } - block_engine_config.trust_packets = matches.is_present("trust_block_engine_packets"); - let mut relayer_config = RelayerConfig::default(); + let mut relayer_config = RelayerConfig { + trust_packets: matches.is_present("trust_relayer_packets"), + ..Default::default() + }; if matches.is_present("relayer_url") { relayer_config.relayer_url = value_of(&matches, "relayer_url").expect("couldn't parse relayer_url"); @@ -2830,26 +2830,19 @@ pub fn main() { } } } - relayer_config.trust_packets = matches.is_present("trust_relayer_packets"); let expected_heartbeat_interval_ms: u64 = - value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap_or( - DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS - .parse() - .unwrap(), - ); + value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap(); let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); relayer_config.expected_heartbeat_interval = expected_heartbeat_interval; - let max_failed_heartbeats: u64 = value_of(&matches, "relayer_max_failed_heartbeats") - .unwrap_or(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS.parse().unwrap()); + let max_failed_heartbeats: u64 = value_of(&matches, "relayer_max_failed_heartbeats").unwrap(); assert!( max_failed_heartbeats > 0, "relayer-max-failed-heartbeats must be greater than zero" ); - let oldest_allowed_heartbeat = + relayer_config.oldest_allowed_heartbeat = Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); - relayer_config.oldest_allowed_heartbeat = oldest_allowed_heartbeat; let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), From a0e17aba1bd1b039a09fbe9f9f382b89fad5f460 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Wed, 17 May 2023 10:19:09 -0500 Subject: [PATCH 4/9] feedback breaking --- validator/src/main.rs | 55 ++++--------------------------------------- 1 file changed, 4 insertions(+), 51 deletions(-) diff --git a/validator/src/main.rs b/validator/src/main.rs index 7583f97cb5..018d9e4d9b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2770,31 +2770,6 @@ pub fn main() { if matches.is_present("block_engine_url") { block_engine_config.block_engine_url = value_of(&matches, "block_engine_url").expect("couldn't parse block_engine_url"); - } else { - let error_msg = "Specifying seperate auth and backend addresses for block engine is deprecated. Recommended to use --block_engine_url instead.\ - If using block_engine_auth_service_address and block_engine_address, they must both be provided and set to the same value."; - match ( - matches.is_present("block_engine_auth_service_address"), - matches.is_present("block_engine_address"), - ) { - (true, true) => { - let auth_addr: String = value_of(&matches, "block_engine_auth_service_address") - .expect("couldn't parse block_engine_auth_service_address"); - let backend_addr: String = value_of(&matches, "block_engine_address") - .expect("couldn't parse block_engine_address"); - if auth_addr != backend_addr { - eprintln!("{}", error_msg); - exit(1); - } else { - block_engine_config.block_engine_url = backend_addr; - } - } - (false, false) => {} - _ => { - eprintln!("{}", error_msg); - exit(1); - } - } } let mut relayer_config = RelayerConfig { @@ -2804,35 +2779,13 @@ pub fn main() { if matches.is_present("relayer_url") { relayer_config.relayer_url = value_of(&matches, "relayer_url").expect("couldn't parse relayer_url"); - } else { - let error_msg = "Specifying seperate auth and backend addresses for relayer is deprecated. Recommended to use --relayer_url instead.\ - If using relayer_auth_service_address and relayer_address, they must both be provided and set to the same value."; - match ( - matches.is_present("relayer_auth_service_address"), - matches.is_present("relayer_address"), - ) { - (true, true) => { - let auth_addr: String = value_of(&matches, "relayer_auth_service_address") - .expect("couldn't parse relayer_auth_service_address"); - let backend_addr: String = - value_of(&matches, "relayer_address").expect("couldn't parse relayer_address"); - if auth_addr != backend_addr { - eprintln!("{}", error_msg); - exit(1); - } else { - relayer_config.relayer_url = backend_addr; - } - } - (false, false) => {} - _ => { - eprintln!("{}", error_msg); - exit(1); - } - } } - let expected_heartbeat_interval_ms: u64 = value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap(); + assert!( + max_failed_heartbeats > 0, + "relayer-max-failed-heartbeats must be greater than zero" + ); let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); relayer_config.expected_heartbeat_interval = expected_heartbeat_interval; From 09c16177639dd266d315982cbafd2a5b3f4f8a98 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Wed, 17 May 2023 14:29:45 -0500 Subject: [PATCH 5/9] fix --- validator/src/admin_rpc_service.rs | 14 ++++++++------ validator/src/main.rs | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 3c451136aa..13a43a1b25 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -307,9 +307,10 @@ impl AdminRpc for AdminRpcImpl { Ok(()) }) } else { - Err(jsonrpc_core::error::Error::invalid_params( - "block_engine_url must point to an http(s) connection.", - )) + Err(jsonrpc_core::error::Error::invalid_params(format!( + "invalid block_engine_url: {} - must point to an http(s) connection or empty string.", + block_engine_url + ))) } } @@ -376,9 +377,10 @@ impl AdminRpc for AdminRpcImpl { Ok(()) }) } else { - Err(jsonrpc_core::error::Error::invalid_params( - "relayer_url must point to an http(s) connection.", - )) + Err(jsonrpc_core::error::Error::invalid_params(format!( + "invalid relayer_url: {} - must point to an http(s) connection or empty string.", + relayer_url + ))) } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 018d9e4d9b..e2971899ea 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2783,7 +2783,7 @@ pub fn main() { let expected_heartbeat_interval_ms: u64 = value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap(); assert!( - max_failed_heartbeats > 0, + expected_heartbeat_interval_ms > 0, "relayer-max-failed-heartbeats must be greater than zero" ); let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); From 50d5557f7aef84ae713720ce1a3c8ca305ea97da Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 19 May 2023 17:04:00 -0500 Subject: [PATCH 6/9] feedback from 1.14 testing --- core/src/proxy/block_engine_stage.rs | 39 ++++++++++------ core/src/proxy/fetch_stage_manager.rs | 6 ++- core/src/proxy/relayer_stage.rs | 34 ++++++++------ validator/src/admin_rpc_service.rs | 58 +++++++++++------------ validator/src/main.rs | 67 ++++++++------------------- 5 files changed, 99 insertions(+), 105 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 5c640ea53f..5f24966b8c 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -149,7 +149,7 @@ impl BlockEngineStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - if !Self::validate_block_engine_config(&block_engine_config.lock().unwrap()) { + if !Self::is_valid_block_engine_config(&block_engine_config.lock().unwrap()) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( &block_engine_config, @@ -195,7 +195,7 @@ impl BlockEngineStage { ) -> crate::proxy::Result<()> { // Get a copy of configs here in case they have changed at runtime let keypair = cluster_info.keypair().clone(); - let local_config = &block_engine_config.lock().unwrap().clone(); + let local_config = block_engine_config.lock().unwrap().clone(); let mut auth_service_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()).map_err(|_| { @@ -213,7 +213,7 @@ impl BlockEngineStage { })? .tcp_keepalive(Some(Duration::from_secs(60))); - if local_config.block_engine_url.contains("https") { + if local_config.block_engine_url.starts_with("https") { auth_service_endpoint = auth_service_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -230,7 +230,7 @@ impl BlockEngineStage { })?; } - debug!("connecting to auth: {:?}", &local_config.block_engine_url); + debug!("connecting to auth: {}", local_config.block_engine_url); let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? @@ -248,13 +248,13 @@ impl BlockEngineStage { datapoint_info!( "block_engine_stage-tokens_generated", - ("url", &local_config.block_engine_url, String), + ("url", local_config.block_engine_url, String), ("count", 1, i64), ); debug!( - "connecting to block engine: {:?}", - &local_config.block_engine_url + "connecting to block engine: {}", + local_config.block_engine_url ); let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await @@ -271,7 +271,7 @@ impl BlockEngineStage { bundle_tx, block_engine_client, packet_tx, - local_config, + &local_config, block_engine_config, verified_packet_tx, exit, @@ -291,8 +291,8 @@ impl BlockEngineStage { bundle_tx: &Sender>, mut client: BlockEngineValidatorClient>, packet_tx: &Sender, - local_config: &BlockEngineConfig, - global_config: &Arc>, + local_config: &BlockEngineConfig, // local copy of config with current connections + global_config: &Arc>, // guarded reference for detecting run-time updates verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, block_builder_fee_info: &Arc>, @@ -366,8 +366,8 @@ impl BlockEngineStage { ), bundle_tx: &Sender>, packet_tx: &Sender, - local_config: &BlockEngineConfig, - global_config: &Arc>, + local_config: &BlockEngineConfig, // local copy of config with current connections + global_config: &Arc>, // guarded reference for detecting run-time updates verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, block_builder_fee_info: &Arc>, @@ -532,7 +532,18 @@ impl BlockEngineStage { Ok(()) } - fn validate_block_engine_config(config: &BlockEngineConfig) -> bool { - !config.block_engine_url.is_empty() + pub fn is_valid_block_engine_config(config: &BlockEngineConfig) -> bool { + if config.block_engine_url.is_empty() { + warn!("can't connect to block_engine. missing block_engine_url."); + return false; + } + if let Err(e) = Endpoint::from_str(&config.block_engine_url) { + error!( + "can't connect to block engine. error creating block engine endpoint - {}", + e.to_string() + ); + return false; + } + true } } diff --git a/core/src/proxy/fetch_stage_manager.rs b/core/src/proxy/fetch_stage_manager.rs index 8f5b95bc2c..74551ce907 100644 --- a/core/src/proxy/fetch_stage_manager.rs +++ b/core/src/proxy/fetch_stage_manager.rs @@ -127,8 +127,10 @@ impl FetchStageManager { Self::set_tpu_addresses(&cluster_info, tpu_addr, tpu_forward_addr); } } else { - // see comment on heartbeat_sender clone in new() - unreachable!(); + { + warn!("relayer heartbeat receiver disconnected, shutting down"); + return; + } } } recv(metrics_tick) -> _ => { diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index 80d680536d..d3a349ac31 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -30,6 +30,7 @@ use { }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, @@ -146,7 +147,7 @@ impl RelayerStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - if !Self::validate_relayer_config(&relayer_config.lock().unwrap()) { + if !Self::is_valid_relayer_config(&relayer_config.lock().unwrap()) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( &relayer_config, @@ -163,7 +164,7 @@ impl RelayerStage { // This error is frequent on hot spares, and the parsed string does not work // with datapoints (incorrect escaping). ProxyError::AuthenticationPermissionDenied => { - warn!("block engine permission denied. not on leader schedule. ignore if hot-spare.") + warn!("relayer permission denied. not on leader schedule. ignore if hot-spare.") } e => { error_count += 1; @@ -207,7 +208,7 @@ impl RelayerStage { )) })? .tcp_keepalive(Some(Duration::from_secs(60))); - if local_config.relayer_url.contains("https") { + if local_config.relayer_url.starts_with("https") { auth_service_endpoint = auth_service_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -224,7 +225,7 @@ impl RelayerStage { })?; } - debug!("connecting to auth: {:?}", local_config.relayer_url); + debug!("connecting to auth: {}", local_config.relayer_url); let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? @@ -246,7 +247,7 @@ impl RelayerStage { ("count", 1, i64), ); - debug!("connecting to relayer: {:?}", local_config.relayer_url); + debug!("connecting to relayer: {}", local_config.relayer_url); let relayer_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::RelayerConnectionTimeout)? @@ -282,8 +283,8 @@ impl RelayerStage { heartbeat_tx: &Sender, packet_tx: &Sender, verified_packet_tx: &Sender<(Vec, Option)>, - local_config: &RelayerConfig, - global_config: &Arc>, + local_config: &RelayerConfig, // local copy of config with current connections + global_config: &Arc>, // guarded reference for detecting run-time updates exit: &Arc, auth_client: AuthServiceClient, access_token: Arc>, @@ -351,8 +352,8 @@ impl RelayerStage { heartbeat_tx: &Sender, mut packet_stream: Streaming, packet_tx: &Sender, - local_config: &RelayerConfig, - global_config: &Arc>, + local_config: &RelayerConfig, // local copy of config with current connections + global_config: &Arc>, // guarded reference for detecting run-time updates verified_packet_tx: &Sender<(Vec, Option)>, exit: &Arc, mut auth_client: AuthServiceClient, @@ -483,17 +484,24 @@ impl RelayerStage { Ok(()) } - fn validate_relayer_config(config: &RelayerConfig) -> bool { + pub fn is_valid_relayer_config(config: &RelayerConfig) -> bool { if config.relayer_url.is_empty() { - warn!("Can't connect to relayer. Missing or invalid url."); + warn!("can't connect to relayer. missing relayer_url."); return false; } if config.oldest_allowed_heartbeat.is_zero() { - warn!("Relayer oldest allowed heartbeat must be greater than 0."); + error!("can't connect to relayer. oldest allowed heartbeat must be greater than 0."); return false; } if config.expected_heartbeat_interval.is_zero() { - warn!("Relayer expected heartbeat interval must be greater than 0."); + error!("can't connect to relayer. expected heartbeat interval must be greater than 0."); + return false; + } + if let Err(e) = Endpoint::from_str(&config.relayer_url) { + error!( + "can't connect to relayer. error creating relayer endpoint - {}", + e.to_string() + ); return false; } true diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 13a43a1b25..93a065192f 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -8,7 +8,10 @@ use { serde::{de::Deserializer, Deserialize, Serialize}, solana_core::{ consensus::Tower, - proxy::{block_engine_stage::BlockEngineConfig, relayer_stage::RelayerConfig}, + proxy::{ + block_engine_stage::{BlockEngineConfig, BlockEngineStage}, + relayer_stage::{RelayerConfig, RelayerStage}, + }, tower_storage::TowerStorage, validator::ValidatorStartProgress, }, @@ -297,20 +300,20 @@ impl AdminRpc for AdminRpcImpl { trust_packets: bool, ) -> Result<()> { debug!("set_block_engine_config request received"); - - if block_engine_url.contains("http") || block_engine_url.is_empty() { + let config = BlockEngineConfig { + block_engine_url, + trust_packets, + }; + // Detailed log messages are printed inside validate function + if BlockEngineStage::is_valid_block_engine_config(&config) { meta.with_post_init(|post_init| { - *post_init.block_engine_config.lock().unwrap() = BlockEngineConfig { - block_engine_url, - trust_packets, - }; + *post_init.block_engine_config.lock().unwrap() = config; Ok(()) }) } else { - Err(jsonrpc_core::error::Error::invalid_params(format!( - "invalid block_engine_url: {} - must point to an http(s) connection or empty string.", - block_engine_url - ))) + Err(jsonrpc_core::error::Error::invalid_params( + "failed to set block engine config. see logs for details.", + )) } } @@ -359,28 +362,25 @@ impl AdminRpc for AdminRpcImpl { max_failed_heartbeats: u64, ) -> Result<()> { debug!("set_relayer_config request received"); - - if relayer_url.contains("http") || relayer_url.is_empty() { + let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); + let oldest_allowed_heartbeat = + Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); + let config = RelayerConfig { + relayer_url, + expected_heartbeat_interval, + oldest_allowed_heartbeat, + trust_packets, + }; + // Detailed log messages are printed inside validate function + if RelayerStage::is_valid_relayer_config(&config) { meta.with_post_init(|post_init| { - let expected_heartbeat_interval = - Duration::from_millis(expected_heartbeat_interval_ms); - - let oldest_allowed_heartbeat = - Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); - - *post_init.relayer_config.lock().unwrap() = RelayerConfig { - relayer_url, - expected_heartbeat_interval, - oldest_allowed_heartbeat, - trust_packets, - }; + *post_init.relayer_config.lock().unwrap() = config; Ok(()) }) } else { - Err(jsonrpc_core::error::Error::invalid_params(format!( - "invalid relayer_url: {} - must point to an http(s) connection or empty string.", - relayer_url - ))) + Err(jsonrpc_core::error::Error::invalid_params( + "failed to set relayer config. see logs for details.", + )) } } diff --git a/validator/src/main.rs b/validator/src/main.rs index e2971899ea..2e74fb639e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2763,39 +2763,40 @@ pub fn main() { let voting_disabled = matches.is_present("no_voting") || restricted_repair_only_mode; let tip_manager_config = tip_manager_config_from_matches(&matches, voting_disabled); - let mut block_engine_config = BlockEngineConfig { + let block_engine_config = BlockEngineConfig { + block_engine_url: if matches.is_present("block_engine_url") { + value_of(&matches, "block_engine_url").expect("couldn't parse block_engine_url") + } else { + "".to_string() + }, trust_packets: matches.is_present("trust_block_engine_packets"), - ..Default::default() }; - if matches.is_present("block_engine_url") { - block_engine_config.block_engine_url = - value_of(&matches, "block_engine_url").expect("couldn't parse block_engine_url"); - } - let mut relayer_config = RelayerConfig { - trust_packets: matches.is_present("trust_relayer_packets"), - ..Default::default() - }; - if matches.is_present("relayer_url") { - relayer_config.relayer_url = - value_of(&matches, "relayer_url").expect("couldn't parse relayer_url"); - } + // Defaults are set in cli definition, safe to use unwrap() here let expected_heartbeat_interval_ms: u64 = value_of(&matches, "relayer_expected_heartbeat_interval_ms").unwrap(); assert!( expected_heartbeat_interval_ms > 0, "relayer-max-failed-heartbeats must be greater than zero" ); - let expected_heartbeat_interval = Duration::from_millis(expected_heartbeat_interval_ms); - relayer_config.expected_heartbeat_interval = expected_heartbeat_interval; - let max_failed_heartbeats: u64 = value_of(&matches, "relayer_max_failed_heartbeats").unwrap(); assert!( max_failed_heartbeats > 0, "relayer-max-failed-heartbeats must be greater than zero" ); - relayer_config.oldest_allowed_heartbeat = - Duration::from_millis(max_failed_heartbeats * expected_heartbeat_interval_ms); + + let relayer_config = RelayerConfig { + relayer_url: if matches.is_present("relayer_url") { + value_of(&matches, "relayer_url").expect("couldn't parse relayer_url") + } else { + "".to_string() + }, + expected_heartbeat_interval: Duration::from_millis(expected_heartbeat_interval_ms), + oldest_allowed_heartbeat: Duration::from_millis( + max_failed_heartbeats * expected_heartbeat_interval_ms, + ), + trust_packets: matches.is_present("trust_relayer_packets"), + }; let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), @@ -3452,30 +3453,6 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { // avoid breaking validator startup commands fn get_deprecated_arguments() -> Vec> { vec![ - Arg::with_name("block_engine_address") - .long("block-engine-address") - .value_name("block_engine_address") - .takes_value(true) - .help("Address of the block engine") - .conflicts_with("block_engine_url"), - Arg::with_name("block_engine_auth_service_address") - .long("block-engine-auth-service-address") - .value_name("block_engine_auth_service_address") - .takes_value(true) - .help("Address of the block engine's authentication service.") - .conflicts_with("block_engine_url"), - Arg::with_name("relayer_auth_service_address") - .long("relayer-auth-service-address") - .value_name("relayer_auth_service_address") - .takes_value(true) - .help("Address of the block engine's authentication service.") - .conflicts_with("relayer_url"), - Arg::with_name("relayer_address") - .long("relayer-address") - .value_name("relayer_address") - .takes_value(true) - .help("Address of the relayer") - .conflicts_with("relayer_url"), Arg::with_name("accounts_db_caching_enabled") .long("accounts-db-caching-enabled") .conflicts_with("no_accounts_db_caching") @@ -3580,10 +3557,6 @@ lazy_static! { "Vote account sanity checks are no longer performed by default.", ), ("no_rocksdb_compaction", ""), - ("block_engine_address", "You can now use a single endpoint to connect to the block-engine. Please use block-engine-url."), - ("block_engine_auth_service_address", "You can now use a single endpoint to connect to the block-engine. Please use block-engine-url."), - ("relayer_address", ""), - ("relayer_auth_service_address", ""), ]; } From 3f71e62885d361110c498065a6566a1c3c136cdc Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 19 May 2023 17:47:41 -0500 Subject: [PATCH 7/9] cleanup --- core/src/proxy/block_engine_stage.rs | 17 +---------------- core/src/proxy/relayer_stage.rs | 16 +--------------- validator/src/main.rs | 5 ++--- 3 files changed, 4 insertions(+), 34 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 5f24966b8c..a47f1b18b1 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -197,13 +197,6 @@ impl BlockEngineStage { let keypair = cluster_info.keypair().clone(); let local_config = block_engine_config.lock().unwrap().clone(); - let mut auth_service_endpoint = - Endpoint::from_shared(local_config.block_engine_url.clone()).map_err(|_| { - ProxyError::AuthenticationConnectionError(format!( - "invalid block engine url value: {}", - local_config.block_engine_url - )) - })?; let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()) .map_err(|_| { ProxyError::BlockEngineConnectionError(format!( @@ -212,15 +205,7 @@ impl BlockEngineStage { )) })? .tcp_keepalive(Some(Duration::from_secs(60))); - if local_config.block_engine_url.starts_with("https") { - auth_service_endpoint = auth_service_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .map_err(|_| { - ProxyError::AuthenticationConnectionError( - "failed to set tls_config for block engine auth service".to_string(), - ) - })?; backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -231,7 +216,7 @@ impl BlockEngineStage { } debug!("connecting to auth: {}", local_config.block_engine_url); - let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) + let auth_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index d3a349ac31..7befc343fb 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -193,13 +193,6 @@ impl RelayerStage { let keypair = cluster_info.keypair().clone(); let local_config = relayer_config.lock().unwrap().clone(); - let mut auth_service_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) - .map_err(|_| { - ProxyError::AuthenticationConnectionError(format!( - "invalid relayer url value: {}", - local_config.relayer_url - )) - })?; let mut backend_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) .map_err(|_| { ProxyError::RelayerConnectionError(format!( @@ -209,13 +202,6 @@ impl RelayerStage { })? .tcp_keepalive(Some(Duration::from_secs(60))); if local_config.relayer_url.starts_with("https") { - auth_service_endpoint = auth_service_endpoint - .tls_config(tonic::transport::ClientTlsConfig::new()) - .map_err(|_| { - ProxyError::AuthenticationConnectionError( - "failed to set tls_config for relayer auth service".to_string(), - ) - })?; backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -226,7 +212,7 @@ impl RelayerStage { } debug!("connecting to auth: {}", local_config.relayer_url); - let auth_channel = timeout(*connection_timeout, auth_service_endpoint.connect()) + let auth_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? .map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?; diff --git a/validator/src/main.rs b/validator/src/main.rs index 2e74fb639e..3c75eed266 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,5 +1,4 @@ #![allow(clippy::integer_arithmetic)] - #[cfg(not(target_env = "msvc"))] use jemallocator::Jemalloc; use { @@ -1788,13 +1787,13 @@ pub fn main() { .arg( Arg::with_name("block_engine_url") .long("block-engine-url") - .help("Block engine url") + .help("Block engine url. Set to empty string to disable block engine connection.") .takes_value(true) ) .arg( Arg::with_name("relayer_url") .long("relayer-url") - .help("Relayer url") + .help("Relayer url. Set to empty string to disable relayer connection.") .takes_value(true) ) .arg( From 0650015c6bebaeb0ae622a11f2fa0e8b904f08f1 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Sat, 20 May 2023 12:14:26 -0500 Subject: [PATCH 8/9] squash --- core/benches/cluster_info.rs | 2 +- core/benches/retransmit_stage.rs | 2 +- core/src/broadcast_stage.rs | 21 ++++++------ .../broadcast_duplicates_run.rs | 4 +-- .../broadcast_fake_shreds_run.rs | 3 +- .../fail_entry_verification_broadcast_run.rs | 4 +-- .../broadcast_stage/standard_broadcast_run.rs | 6 ++-- core/src/retransmit_stage.rs | 16 ++++----- core/src/tpu.rs | 2 +- core/src/tvu.rs | 2 +- core/src/validator.rs | 9 ++--- local-cluster/src/validator_configs.rs | 2 +- validator/src/admin_rpc_service.rs | 23 +++++++++++++ validator/src/bin/solana-test-validator.rs | 1 + validator/src/main.rs | 34 ++++++++++++++++--- 15 files changed, 91 insertions(+), 40 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 84e9d5685e..96092e5985 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -79,7 +79,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &cluster_info, &bank_forks, &SocketAddrSpace::Unspecified, - None, + &None, ) .unwrap(); }); diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index c6bec24fbd..5d309d9801 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -121,7 +121,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, - None, + Arc::new(RwLock::new(None)), ); let mut index = 0; diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 1d34ec7ac8..d242ffa924 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -86,7 +86,7 @@ impl BroadcastStageType { blockstore: Arc, bank_forks: Arc>, shred_version: u16, - shred_receiver_addr: Option, + shred_receiver_address: Arc>>, ) -> BroadcastStage { match self { BroadcastStageType::Standard => BroadcastStage::new( @@ -98,7 +98,7 @@ impl BroadcastStageType { blockstore, bank_forks, StandardBroadcastRun::new(shred_version), - shred_receiver_addr, + shred_receiver_address, ), BroadcastStageType::FailEntryVerification => BroadcastStage::new( @@ -110,7 +110,7 @@ impl BroadcastStageType { blockstore, bank_forks, FailEntryVerificationBroadcastRun::new(shred_version), - None, + Arc::new(RwLock::new(None)), ), BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new( @@ -122,7 +122,7 @@ impl BroadcastStageType { blockstore, bank_forks, BroadcastFakeShredsRun::new(0, shred_version), - None, + Arc::new(RwLock::new(None)), ), BroadcastStageType::BroadcastDuplicates(config) => BroadcastStage::new( @@ -134,7 +134,7 @@ impl BroadcastStageType { blockstore, bank_forks, BroadcastDuplicatesRun::new(shred_version, config.clone()), - None, + Arc::new(RwLock::new(None)), ), } } @@ -155,7 +155,7 @@ trait BroadcastRun { cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, - shred_receiver_addr: Option, + shred_receiver_address: &Arc>>, ) -> Result<()>; fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()>; } @@ -251,7 +251,7 @@ impl BroadcastStage { blockstore: Arc, bank_forks: Arc>, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, - shred_receiver_addr: Option, + shred_receiver_address: Arc>>, ) -> Self { let (socket_sender, socket_receiver) = unbounded(); let (blockstore_sender, blockstore_receiver) = unbounded(); @@ -283,6 +283,7 @@ impl BroadcastStage { let mut bs_transmit = broadcast_stage_run.clone(); let cluster_info = cluster_info.clone(); let bank_forks = bank_forks.clone(); + let shred_receiver_address = shred_receiver_address.clone(); let t = Builder::new() .name("solBroadcastTx".to_string()) .spawn(move || loop { @@ -291,7 +292,7 @@ impl BroadcastStage { &cluster_info, &sock, &bank_forks, - shred_receiver_addr, + &shred_receiver_address, ); let res = Self::handle_error(res, "solana-broadcaster-transmit"); if let Some(res) = res { @@ -408,7 +409,7 @@ pub fn broadcast_shreds( cluster_info: &ClusterInfo, bank_forks: &RwLock, socket_addr_space: &SocketAddrSpace, - shred_receiver_addr: Option, + shred_receiver_address: &Option, ) -> Result<()> { let mut result = Ok(()); let mut shred_select = Measure::start("shred_select"); @@ -418,7 +419,7 @@ pub fn broadcast_shreds( }; let packets: Vec<_> = shreds .iter() - .filter_map(|s| Some((s.payload(), shred_receiver_addr?))) + .filter_map(|s| Some((s.payload(), (*shred_receiver_address)?))) .chain( shreds .iter() diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 3aee2d123d..4c32ca938e 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -10,7 +10,7 @@ use { signature::{Keypair, Signature, Signer}, system_transaction, }, - std::{collections::HashSet, net::SocketAddr}, + std::collections::HashSet, }; pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; @@ -266,7 +266,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, - _shred_receiver_addr: Option, + _shred_receiver_addr: &Arc>>, ) -> Result<()> { let (shreds, _) = receiver.lock().unwrap().recv()?; if shreds.is_empty() { diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 867a424f91..9f9fefffce 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -3,7 +3,6 @@ use { solana_entry::entry::Entry, solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{hash::Hash, signature::Keypair}, - std::net::SocketAddr, }; #[derive(Clone)] @@ -133,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { cluster_info: &ClusterInfo, sock: &UdpSocket, _bank_forks: &RwLock, - _shred_receiver_addr: Option, + _shred_receiver_addr: &Arc>>, ) -> Result<()> { for (data_shreds, batch_info) in receiver.lock().unwrap().iter() { let fake = batch_info.is_some(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 57cb9532da..61ea979402 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -162,7 +162,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, - shred_receiver_addr: Option, + shred_receiver_address: &Arc>>, ) -> Result<()> { let (shreds, _) = receiver.lock().unwrap().recv()?; broadcast_shreds( @@ -174,7 +174,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { cluster_info, bank_forks, cluster_info.socket_addr_space(), - shred_receiver_addr, + &shred_receiver_address.read().unwrap(), ) } fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ea8a8b62b4..c09167cb5a 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -378,7 +378,7 @@ impl StandardBroadcastRun { shreds: Arc>, broadcast_shred_batch_info: Option, bank_forks: &RwLock, - shred_receiver_addr: Option, + shred_receiver_addr: &Option, ) -> Result<()> { trace!("Broadcasting {:?} shreds", shreds.len()); let mut transmit_stats = TransmitShredsStats::default(); @@ -464,7 +464,7 @@ impl BroadcastRun for StandardBroadcastRun { cluster_info: &ClusterInfo, sock: &UdpSocket, bank_forks: &RwLock, - shred_receiver_addr: Option, + shred_receiver_address: &Arc>>, ) -> Result<()> { let (shreds, batch_info) = receiver.lock().unwrap().recv()?; self.broadcast( @@ -473,7 +473,7 @@ impl BroadcastRun for StandardBroadcastRun { shreds, batch_info, bank_forks, - shred_receiver_addr, + &shred_receiver_address.read().unwrap(), ) } fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 4b8fadd4c4..a6ec286418 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -178,7 +178,7 @@ fn retransmit( packet_hasher: &mut PacketHasher, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, - shred_receiver_addr: Option, + shred_receiver_address: &Arc>>, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; @@ -257,7 +257,7 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - shred_receiver_addr, + &shred_receiver_address.read().unwrap(), ); (key.slot(), root_distance, num_nodes) }) @@ -277,7 +277,7 @@ fn retransmit( socket_addr_space, &sockets[index % sockets.len()], stats, - shred_receiver_addr, + &shred_receiver_address.read().unwrap(), ); (key.slot(), root_distance, num_nodes) }) @@ -301,14 +301,14 @@ fn retransmit_shred( socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, stats: &RetransmitStats, - shred_receiver_addr: Option, + shred_receiver_addr: &Option, ) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) { let mut compute_turbine_peers = Measure::start("turbine_start"); let (root_distance, mut addrs) = cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT); if let Some(addr) = shred_receiver_addr { - addrs.push(addr); + addrs.push(*addr); } let addrs: Vec<_> = addrs @@ -360,7 +360,7 @@ pub fn retransmitter( shreds_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, - shred_receiver_addr: Option, + shred_receiver_addr: Arc>>, ) -> JoinHandle<()> { let cluster_nodes_cache = ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, @@ -393,7 +393,7 @@ pub fn retransmitter( &mut packet_hasher, &max_slots, rpc_subscriptions.as_deref(), - shred_receiver_addr, + &shred_receiver_addr, ) { Ok(()) => (), Err(RecvTimeoutError::Timeout) => (), @@ -416,7 +416,7 @@ impl RetransmitStage { retransmit_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, - shred_receiver_addr: Option, + shred_receiver_addr: Arc>>, ) -> Self { let retransmit_thread_handle = retransmitter( retransmit_sockets, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d112125613..2753ef6b0b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -116,7 +116,7 @@ impl Tpu { block_engine_config: Arc>, relayer_config: Arc>, tip_manager_config: TipManagerConfig, - shred_receiver_address: Option, + shred_receiver_address: Arc>>, shared_staked_nodes_overrides: Arc>>, tpu_enable_udp: bool, preallocated_bundle_cost: u64, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0b0ded03ac..2b2433a690 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -131,7 +131,7 @@ impl Tvu { log_messages_bytes_limit: Option, connection_cache: &Arc, prioritization_fee_cache: &Arc, - shred_receiver_addr: Option, + shred_receiver_addr: Arc>>, ) -> Result { let TvuSockets { repair: repair_socket, diff --git a/core/src/validator.rs b/core/src/validator.rs index 5f5e6fca21..cc4f9aa06c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -182,7 +182,8 @@ pub struct ValidatorConfig { pub replay_slots_concurrently: bool, pub relayer_config: Arc>, pub block_engine_config: Arc>, - pub shred_receiver_address: Option, + // Using Option inside RwLock is ugly, but only convenient way to allow toggle on/off + pub shred_receiver_address: Arc>>, pub tip_manager_config: TipManagerConfig, pub preallocated_bundle_cost: u64, } @@ -252,7 +253,7 @@ impl Default for ValidatorConfig { replay_slots_concurrently: false, relayer_config: Arc::new(Mutex::new(RelayerConfig::default())), block_engine_config: Arc::new(Mutex::new(BlockEngineConfig::default())), - shred_receiver_address: None, + shred_receiver_address: Arc::new(RwLock::new(None)), tip_manager_config: TipManagerConfig::default(), preallocated_bundle_cost: u64::default(), } @@ -1005,7 +1006,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, &connection_cache, &prioritization_fee_cache, - config.shred_receiver_address, + config.shred_receiver_address.clone(), )?; let tpu = Tpu::new( @@ -1044,7 +1045,7 @@ impl Validator { config.block_engine_config.clone(), config.relayer_config.clone(), config.tip_manager_config.clone(), - config.shred_receiver_address, + config.shred_receiver_address.clone(), config.staked_nodes_overrides.clone(), tpu_enable_udp, config.preallocated_bundle_cost, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index a75c45e41f..291e6d4525 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -69,7 +69,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { replay_slots_concurrently: config.replay_slots_concurrently, relayer_config: config.relayer_config.clone(), block_engine_config: config.block_engine_config.clone(), - shred_receiver_address: config.shred_receiver_address, + shred_receiver_address: config.shred_receiver_address.clone(), tip_manager_config: config.tip_manager_config.clone(), preallocated_bundle_cost: config.preallocated_bundle_cost, } diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 93a065192f..a7283bd501 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -28,6 +28,7 @@ use { fmt::{self, Display}, net::SocketAddr, path::{Path, PathBuf}, + str::FromStr, sync::{Arc, Mutex, RwLock}, thread::{self, Builder}, time::{Duration, SystemTime}, @@ -41,6 +42,7 @@ pub struct AdminRpcRequestMetadataPostInit { pub vote_account: Pubkey, pub relayer_config: Arc>, pub block_engine_config: Arc>, + pub shred_receiver_address: Arc>>, } #[derive(Clone)] @@ -209,6 +211,9 @@ pub trait AdminRpc { expected_heartbeat_interval_ms: u64, max_failed_heartbeats: u64, ) -> Result<()>; + + #[rpc(meta, name = "setShredReceiverAddress")] + fn set_shred_receiver_address(&self, meta: Self::Metadata, addr: String) -> Result<()>; } pub struct AdminRpcImpl; @@ -384,6 +389,24 @@ impl AdminRpc for AdminRpcImpl { } } + fn set_shred_receiver_address(&self, meta: Self::Metadata, addr: String) -> Result<()> { + let shred_receiver_address = if addr.is_empty() { + None + } else { + Some(SocketAddr::from_str(&addr).map_err(|_| { + jsonrpc_core::error::Error::invalid_params(format!( + "invalid shred receiver address: {}", + addr + )) + })?) + }; + + meta.with_post_init(|post_init| { + *post_init.shred_receiver_address.write().unwrap() = shred_receiver_address; + Ok(()) + }) + } + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> { let loaded_config = load_staked_nodes_overrides(&path) .map_err(|err| { diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index b224cb04f0..3fc22fec61 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -863,6 +863,7 @@ fn main() { vote_account: test_validator.vote_account_address(), block_engine_config: Arc::new(Mutex::new(BlockEngineConfig::default())), relayer_config: Arc::new(Mutex::new(RelayerConfig::default())), + shred_receiver_address: Arc::new(RwLock::new(None)), }); if let Some(dashboard) = dashboard { dashboard.run(Duration::from_millis(250)); diff --git a/validator/src/main.rs b/validator/src/main.rs index 3c75eed266..b07cf6ed8c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1863,7 +1863,7 @@ pub fn main() { .long("shred-receiver-address") .value_name("SHRED_RECEIVER_ADDRESS") .takes_value(true) - .help("Shred receiver listening address") + .help("Validator will forward all shreds to this address in addition to normal turbine operation. Set to empty string to disable.") ) .arg( Arg::with_name("log_messages_bytes_limit") @@ -2056,6 +2056,18 @@ pub fn main() { .default_value(DEFAULT_RELAYER_MAX_FAILED_HEARTBEATS) ) ) + .subcommand( + SubCommand::with_name("set-shred-receiver-address") + .about("Changes shred receiver address") + .arg( + Arg::with_name("shred_receiver_address") + .long("shred-receiver-address") + .value_name("SHRED_RECEIVER_ADDRESS") + .takes_value(true) + .help("Validator will forward all shreds to this address in addition to normal turbine operation. Set to empty string to disable.") + .required(true) + ) + ) .subcommand( SubCommand::with_name("staked-nodes-overrides") .about("Overrides stakes of specific node identities.") @@ -2384,6 +2396,17 @@ pub fn main() { }); return; } + ("set-shred-receiver-address", Some(subcommand_matches)) => { + let addr = value_t_or_exit!(subcommand_matches, "shred_receiver_address", String); + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { admin_client.await?.set_shred_receiver_address(addr).await }) + .unwrap_or_else(|err| { + println!("set shred receiver address failed: {}", err); + exit(1); + }); + return; + } ("wait-for-restart-window", Some(subcommand_matches)) => { let min_idle_time = value_t_or_exit!(subcommand_matches, "min_idle_time", usize); let identity = pubkey_of(subcommand_matches, "identity"); @@ -2933,9 +2956,11 @@ pub fn main() { relayer_config: Arc::new(Mutex::new(relayer_config)), block_engine_config: Arc::new(Mutex::new(block_engine_config)), tip_manager_config, - shred_receiver_address: matches - .value_of("shred_receiver_address") - .map(|address| SocketAddr::from_str(address).expect("shred_receiver_address invalid")), + shred_receiver_address: Arc::new(RwLock::new( + matches + .value_of("shred_receiver_address") + .map(|addr| SocketAddr::from_str(addr).expect("shred_receiver_address invalid")), + )), staked_nodes_overrides: staked_nodes_overrides.clone(), replay_slots_concurrently: matches.is_present("replay_slots_concurrently"), preallocated_bundle_cost: value_of(&matches, "preallocated_bundle_cost") @@ -3386,6 +3411,7 @@ pub fn main() { vote_account, relayer_config: validator_config.relayer_config, block_engine_config: validator_config.block_engine_config, + shred_receiver_address: validator_config.shred_receiver_address, }); if let Some(filename) = init_complete_file { From 4f2338ee5d70e4f12205b4ea4301a9543dadf68b Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Sat, 20 May 2023 13:07:03 -0500 Subject: [PATCH 9/9] fix tests --- core/src/broadcast_stage.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 16 ++++++++++++++-- core/src/tvu.rs | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index d242ffa924..87b9fc9e26 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -636,7 +636,7 @@ pub mod test { blockstore.clone(), bank_forks, StandardBroadcastRun::new(0), - None, + Arc::new(RwLock::new(None)), ); MockBroadcastStage { diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index c09167cb5a..6d147940e2 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -180,10 +180,22 @@ impl StandardBroadcastRun { let brecv = Arc::new(Mutex::new(brecv)); //data - let _ = self.transmit(&srecv, cluster_info, sock, bank_forks, None); + let _ = self.transmit( + &srecv, + cluster_info, + sock, + bank_forks, + &Arc::new(RwLock::new(None)), + ); let _ = self.record(&brecv, blockstore); //coding - let _ = self.transmit(&srecv, cluster_info, sock, bank_forks, None); + let _ = self.transmit( + &srecv, + cluster_info, + sock, + bank_forks, + &Arc::new(RwLock::new(None)), + ); let _ = self.record(&brecv, blockstore); Ok(()) } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2b2433a690..c66ce67ed4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -458,7 +458,7 @@ pub mod tests { None, &Arc::new(ConnectionCache::default()), &_ignored_prioritization_fee_cache, - None, + Arc::new(RwLock::new(None)), ) .expect("assume success"); exit.store(true, Ordering::Relaxed);