Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

657 fast ping on startup #675

Merged
merged 12 commits into from
Feb 7, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Bob versions changelog
## [Unreleased]
#### Added
- Quorum argument for manual workflow dispatch for integration tests (#749)
- Fast ping at the start (#657)

#### Changed
- Make local put parallel to remote (#573)
Expand Down
54 changes: 44 additions & 10 deletions bob/src/link_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::prelude::*;
use std::time::Instant;

const FAST_PING_PERIOD_MS: u64 = 100;
const FAST_PING_DURATION_SEC: u64 = 60;

#[derive(Debug)]
pub(crate) struct LinkManager {
Expand All @@ -19,26 +23,56 @@ impl LinkManager {
}

async fn checker_task(factory: Factory, nodes: Arc<[Node]>, period: Duration) {
let start = Instant::now();
let fast_log_iteration_div =
(period.as_millis() as usize / FAST_PING_PERIOD_MS as usize).max(1);
Self::checker(
&factory,
&nodes,
Duration::from_millis(FAST_PING_PERIOD_MS).min(period),
|| start.elapsed().as_secs() > FAST_PING_DURATION_SEC,
fast_log_iteration_div,
)
.await;
Self::checker(&factory, &nodes, period, || false, 1).await;
}

async fn checker(
factory: &Factory,
nodes: &[Node],
period: Duration,
should_stop: impl Fn() -> bool,
log_iteration_div: usize,
) {
let mut interval = interval(period);
loop {
let mut i: usize = 1;
while !should_stop() {
i = i.wrapping_add(1) % log_iteration_div;
let log_in_this_iter = i == 0;
interval.tick().await;
let mut err_cnt = 0;
let mut status = String::from("Node status: ");
for node in nodes.iter() {
if let Err(e) = node.check(&factory).await {
error!(
"No connection to {}:[{}] - {}",
node.name(),
node.address(),
e
);
status += &format!("[-]{:<10} ", node.name());
if log_in_this_iter {
error!(
"No connection to {}:[{}] - {}",
node.name(),
node.address(),
e
);
status += &format!("[-]{:<10} ", node.name());
}
err_cnt += 1;
} else {
status += &format!("[+]{:<10} ", node.name());
if log_in_this_iter {
status += &format!("[+]{:<10} ", node.name());
}
}
}
info!("{}", status);
if log_in_this_iter {
info!("{}", status);
}
let cnt = nodes.len() - err_cnt;
gauge!(AVAILABLE_NODES_COUNT, cnt as f64);
}
Expand Down