diff --git a/applications/minotari_node/src/config.rs b/applications/minotari_node/src/config.rs index f209e5b23a..044caee2cd 100644 --- a/applications/minotari_node/src/config.rs +++ b/applications/minotari_node/src/config.rs @@ -142,7 +142,7 @@ pub struct BaseNodeConfig { pub state_machine: BaseNodeStateMachineConfig, /// Obscure GRPC error responses pub report_grpc_error: bool, - /// Interval to check if the base node is still in sync with the network + // Interval to check if the base node is still in sync with the network #[serde(with = "serializers::seconds")] pub tari_pulse_interval: Duration, } diff --git a/base_layer/core/src/base_node/tari_pulse_service/mod.rs b/base_layer/core/src/base_node/tari_pulse_service/mod.rs index 5d2a277533..0d7492b686 100644 --- a/base_layer/core/src/base_node/tari_pulse_service/mod.rs +++ b/base_layer/core/src/base_node/tari_pulse_service/mod.rs @@ -33,7 +33,7 @@ use hickory_client::{ rr::{DNSClass, Name, RData, Record, RecordType}, tcp::TcpClientStream, }; -use log::{error, info}; +use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use tari_p2p::Network; use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext}; @@ -75,13 +75,18 @@ fn get_network_dns_name(network: Network) -> Name { pub struct TariPulseService { dns_name: Name, config: TariPulseConfig, + shutdown_signal: ShutdownSignal, } impl TariPulseService { - pub async fn new(config: TariPulseConfig) -> Result { + pub async fn new(config: TariPulseConfig, shutdown_signal: ShutdownSignal) -> Result { let dns_name: Name = get_network_dns_name(config.clone().network); info!(target: LOG_TARGET, "Tari Pulse Service initialized with DNS name: {}", dns_name); - Ok(Self { dns_name, config }) + Ok(Self { + dns_name, + config, + shutdown_signal, + }) } pub fn default_trust_anchor() -> TrustAnchor { @@ -116,21 +121,45 @@ impl TariPulseService { notify_passed_checkpoints: watch::Sender, ) { let mut interval = time::interval(self.config.check_interval); - let mut interval_failed = time::interval(Duration::from_millis(100)); + let mut shutdown_signal = self.shutdown_signal.clone(); + loop { - interval.tick().await; - let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await { - Ok(passed) => passed, - Err(err) => { - error!(target: LOG_TARGET, "Error checking if node passed checkpoints: {:?}", err); - interval_failed.tick().await; - continue; - }, - }; + tokio::select! { + _ = interval.tick() => { + let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await { + Ok(passed) => { + interval = time::interval(self.config.check_interval); // reset interval if back to healthy + passed + }, + Err(err) => { + warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err); + let old_interval = interval.period().as_secs(); + let new_interval = if old_interval > (60 * 30) { + warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes."); + old_interval + } else { + // increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints + interval = time::interval(Duration::from_secs(old_interval * 2)); + interval.tick().await; + interval.period().as_secs() + }; + warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval); + continue; + }, + }; - notify_passed_checkpoints - .send(!passed_checkpoints) - .expect("Channel should be open"); + notify_passed_checkpoints + .send(!passed_checkpoints) + .expect("Channel should be open"); + }, + _ = shutdown_signal.wait() => { + info!( + target: LOG_TARGET, + "Tari Pulse shutting down because the shutdown signal was received" + ); + break; + }, + } } } @@ -231,7 +260,7 @@ impl ServiceInitializer for TariPulseServiceInitializer { context.spawn_when_ready(move |handles| async move { let base_node_service = handles.expect_handle::(); - let mut tari_pulse_service = TariPulseService::new(config) + let mut tari_pulse_service = TariPulseService::new(config, shutdown_signal.clone()) .await .expect("Should be able to get the service"); let tari_pulse_service = tari_pulse_service.run(base_node_service, sender);