Skip to content

Commit

Permalink
fix(tari-pulse): await shutdown signal in main loop (#6696)
Browse files Browse the repository at this point in the history
Description
---
Await shutdown signal and reduce error logging when node failed to
compare blocks hashes.

Motivation and Context
---
Error logs were spamming on not supported network, greatly reducing
readability and shutdown signal wasn't working propely.

How Has This Been Tested?
---
Run local node on esmeralda network (which is not supported yet) and
watch for error logs. Also check shutdown behavior.

What process can a PR reviewer use to test or verify this change?
---
Same as above


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

---------

Co-authored-by: SW van Heerden <[email protected]>
Co-authored-by: Hansie Odendaal <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent d22ef65 commit 321e9ba
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
2 changes: 1 addition & 1 deletion applications/minotari_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub struct BaseNodeConfig {
pub state_machine: BaseNodeStateMachineConfig,
/// Obscure GRPC error responses
pub report_grpc_error: bool,
/// Interval to check if the base node is still in sync with the network
// Interval to check if the base node is still in sync with the network
#[serde(with = "serializers::seconds")]
pub tari_pulse_interval: Duration,
}
Expand Down
63 changes: 46 additions & 17 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
Expand Down Expand Up @@ -75,13 +75,18 @@ fn get_network_dns_name(network: Network) -> Name {
pub struct TariPulseService {
dns_name: Name,
config: TariPulseConfig,
shutdown_signal: ShutdownSignal,
}

impl TariPulseService {
pub async fn new(config: TariPulseConfig) -> Result<Self, anyhow::Error> {
pub async fn new(config: TariPulseConfig, shutdown_signal: ShutdownSignal) -> Result<Self, anyhow::Error> {
let dns_name: Name = get_network_dns_name(config.clone().network);
info!(target: LOG_TARGET, "Tari Pulse Service initialized with DNS name: {}", dns_name);
Ok(Self { dns_name, config })
Ok(Self {
dns_name,
config,
shutdown_signal,
})
}

pub fn default_trust_anchor() -> TrustAnchor {
Expand Down Expand Up @@ -116,21 +121,45 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
let mut interval_failed = time::interval(Duration::from_millis(100));
let mut shutdown_signal = self.shutdown_signal.clone();

loop {
interval.tick().await;
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => passed,
Err(err) => {
error!(target: LOG_TARGET, "Error checking if node passed checkpoints: {:?}", err);
interval_failed.tick().await;
continue;
},
};
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let old_interval = interval.period().as_secs();
let new_interval = if old_interval > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes.");
old_interval
} else {
// increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints
interval = time::interval(Duration::from_secs(old_interval * 2));
interval.tick().await;
interval.period().as_secs()
};
warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval);
continue;
},
};

notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
notify_passed_checkpoints
.send(!passed_checkpoints)
.expect("Channel should be open");
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
"Tari Pulse shutting down because the shutdown signal was received"
);
break;
},
}
}
}

Expand Down Expand Up @@ -231,7 +260,7 @@ impl ServiceInitializer for TariPulseServiceInitializer {

context.spawn_when_ready(move |handles| async move {
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let mut tari_pulse_service = TariPulseService::new(config)
let mut tari_pulse_service = TariPulseService::new(config, shutdown_signal.clone())
.await
.expect("Should be able to get the service");
let tari_pulse_service = tari_pulse_service.run(base_node_service, sender);
Expand Down

0 comments on commit 321e9ba

Please sign in to comment.