Skip to content

Commit

Permalink
tari pulse
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 27, 2024
1 parent 7e2a305 commit fbd4fd0
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions applications/minotari_node/src/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{str::FromStr, time::Duration};
use std::{cmp::max, str::FromStr, time::Duration};

use futures::future;
use hickory_client::{
Expand All @@ -33,14 +33,14 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info, trace, warn};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use tari_core::base_node::{comms_interface::CommsInterfaceError, LocalNodeCommsInterface};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time};
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time, time::MissedTickBehavior};

const LOG_TARGET: &str = "c::bn::tari_pulse";
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -119,31 +119,37 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::pin!(interval);
let mut shutdown_signal = self.shutdown_signal.clone();
let mut count = 0u64;
let mut skip_ticks = 0;
let mut skipped_ticks = 0;

loop {
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let old_interval = interval.period().as_secs();
let new_interval = if old_interval > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes.");
old_interval
} else {
// increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints
interval = time::interval(Duration::from_secs(old_interval * 2));
interval.tick().await;
interval.period().as_secs()
};
warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval);
continue;
},
count += 1;
trace!(target: LOG_TARGET, "Interval tick: {}", count);
if skipped_ticks < skip_ticks {
skipped_ticks += 1;
debug!(target: LOG_TARGET, "Sipping {} of {} ticks", skipped_ticks, skip_ticks);
continue;
}
let passed_checkpoints = {
match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
skip_ticks = 0;
skipped_ticks = 0;
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
skip_ticks = max(skip_ticks + 1, 30 * 60 / self.config.check_interval.as_secs());
skipped_ticks = 0;
continue;
},
}
};

notify_passed_checkpoints
Expand Down

0 comments on commit fbd4fd0

Please sign in to comment.