Skip to content

Commit

Permalink
Merge pull request #184 from sr-gi/retrier-waits
Browse files Browse the repository at this point in the history
Defines Retrier polling waiting time as a constant and fixes tests
  • Loading branch information
sr-gi authored Feb 6, 2023
2 parents 4ce986e + 73a9bff commit 0cf3d97
Showing 1 changed file with 65 additions and 46 deletions.
111 changes: 65 additions & 46 deletions watchtower-plugin/src/retrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::net::http::{self, AddAppointmentError};
use crate::wt_client::{RevocationData, WTClient};
use crate::{MisbehaviorProof, TowerStatus};

const POLLING_TIME: u64 = 1;

#[derive(Eq, PartialEq, Debug)]
enum RetryError {
// bool marks whether the Subscription error is permanent or not
Expand Down Expand Up @@ -167,7 +169,7 @@ impl RetryManager {
}
}
// Sleep to not waste a lot of CPU cycles.
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(POLLING_TIME)).await;
}
Err(TryRecvError::Disconnected) => break,
}
Expand Down Expand Up @@ -599,8 +601,10 @@ mod tests {
const LONG_AUTO_RETRY_DELAY: u32 = 60;
const SHORT_AUTO_RETRY_DELAY: u32 = 3;
const API_DELAY: f64 = 0.5;
const HALF_API_DELAY: f64 = API_DELAY / 2.0;
const MAX_ELAPSED_TIME: u16 = 2;
const MAX_INTERVAL_TIME: u16 = 1;
const MAX_RUN_TIME: f64 = 0.2;

impl Retrier {
fn empty(wt_client: Arc<Mutex<WTClient>>, tower_id: TowerId) -> Self {
Expand Down Expand Up @@ -658,6 +662,9 @@ mod tests {
});

// Start the task and send the tower to the channel for retry
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
Expand All @@ -670,18 +677,18 @@ mod tests {
.manage_retry()
.await
});
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

// Wait for the elapsed time and check how the tower status changed
tokio::time::sleep(Duration::from_secs((API_DELAY / 2.0) as u64)).await;
// Wait for a fraction of the API delay and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
assert!(wt_client
.lock()
.unwrap()
.get_retrier_status(&tower_id)
.unwrap()
.is_running());
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;

// Wait for the remaining time and re-check
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;

let state = wt_client.lock().unwrap();
assert_eq!(
Expand Down Expand Up @@ -727,24 +734,24 @@ mod tests {
.add_pending_appointment(tower_id, &appointment);

// Start the task and send the tower to the channel for retry
let wt_client_clone = wt_client.clone();

let mut retry_manager = RetryManager::new(
wt_client_clone,
rx,
MAX_ELAPSED_TIME + 1,
SHORT_AUTO_RETRY_DELAY,
MAX_INTERVAL_TIME,
);
let task = tokio::spawn(async move { retry_manager.manage_retry().await });
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

// Wait for the elapsed time and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(
(MAX_ELAPSED_TIME as f64 + 1.0) / 2.0,
))
.await;
let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
wt_client_clone,
rx,
MAX_ELAPSED_TIME,
SHORT_AUTO_RETRY_DELAY,
MAX_INTERVAL_TIME,
)
.manage_retry()
.await
});

// Wait for one retry round and check to tower status
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME)).await;
assert!(wt_client
.lock()
.unwrap()
Expand All @@ -758,8 +765,9 @@ mod tests {
.unwrap()
.is_running());

// Wait until the task gives up and check again (this gives up due to accumulation of transient errors,
// so the retiers will be idle).
// Wait until the task gives up and check again (this gives up due to accumulation of transient errors, so the retiers will be idle).
// Notice we'd normally wait for MAX_ELAPSED_TIME + MAX_RUN_TIME (the maximum time a Retrier can be working plus the marginal time of the last retry).
// However, we've already waited for MAX_RUN_TIME right before to check the tower was temporary unreachable, so we don't need to account for that again.
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
assert!(wt_client
.lock()
Expand Down Expand Up @@ -867,6 +875,9 @@ mod tests {
});

// Start the task and send the tower to the channel for retry
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
Expand All @@ -879,18 +890,18 @@ mod tests {
.manage_retry()
.await
});
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();
// Wait for the elapsed time and check how the tower status changed
tokio::time::sleep(Duration::from_secs((API_DELAY / 2.0) as u64)).await;

// Wait for a fraction of the API delay and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
assert!(wt_client
.lock()
.unwrap()
.get_retrier_status(&tower_id)
.unwrap()
.is_running());

tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
// Wait for the remaining time and re-check
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;
assert_eq!(
wt_client
.lock()
Expand Down Expand Up @@ -965,6 +976,9 @@ mod tests {
});

// Start the task and send the tower to the channel for retry
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
Expand All @@ -977,25 +991,27 @@ mod tests {
.manage_retry()
.await
});
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

// Wait for the elapsed time and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(API_DELAY / 2.0)).await;
// Wait for a fraction of the API delay and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
assert!(wt_client
.lock()
.unwrap()
.get_retrier_status(&tower_id)
.unwrap()
.is_running());

tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
// Wait for the remaining time and re-check
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY + MAX_RUN_TIME)).await;
assert!(wt_client
.lock()
.unwrap()
.get_tower_status(&tower_id)
.unwrap()
.is_misbehaving());

// Retriers are wiped every polling interval, so we'll need to wait a bit more to check it
tokio::time::sleep(Duration::from_secs(POLLING_TIME)).await;
assert!(!wt_client.lock().unwrap().retriers.contains_key(&tower_id));
api_mock.assert();

Expand Down Expand Up @@ -1025,6 +1041,8 @@ mod tests {
wt_client.lock().unwrap().remove_tower(tower_id).unwrap();

// Start the task and send the tower to the channel for retry
tx.send((tower_id, RevocationData::None)).unwrap();

let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
Expand All @@ -1038,9 +1056,6 @@ mod tests {
.await
});

// Send a retry request and check how the tower is removed
tx.send((tower_id, RevocationData::None)).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(!wt_client.lock().unwrap().towers.contains_key(&tower_id));

task.abort();
Expand Down Expand Up @@ -1085,7 +1100,6 @@ mod tests {
let add_appointment_mock = server.mock(|when, then| {
when.method(POST).path(Endpoint::AddAppointment.path());
then.status(200)
.delay(Duration::from_secs_f64(API_DELAY))
.header("content-type", "application/json")
.json_body(json!(add_appointment_response));
});
Expand All @@ -1095,7 +1109,7 @@ mod tests {
get_registration_receipt_from_previous(&registration_receipt);
re_registration_receipt.sign(&tower_sk);
let register_mock = server.mock(|when, then| {
when.method(POST).path("/register");
when.method(POST).path(Endpoint::Register.path());
then.status(200)
.delay(Duration::from_secs_f64(API_DELAY))
.header("content-type", "application/json")
Expand All @@ -1109,6 +1123,9 @@ mod tests {
.set_tower_status(tower_id, TowerStatus::SubscriptionError);

// Start the task and send the tower to the channel for retry
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

let wt_client_clone = wt_client.clone();
let task = tokio::spawn(async move {
RetryManager::new(
Expand All @@ -1121,19 +1138,18 @@ mod tests {
.manage_retry()
.await
});
tx.send((tower_id, RevocationData::Fresh(appointment.locator)))
.unwrap();

tokio::time::sleep(Duration::from_secs_f64(API_DELAY / 2.0)).await;
// Wait for a fraction of the API delay and check how the tower status changed
tokio::time::sleep(Duration::from_secs_f64(HALF_API_DELAY)).await;
assert!(wt_client
.lock()
.unwrap()
.get_retrier_status(&tower_id)
.unwrap()
.is_running());

// Wait for the elapsed time and check how the tower status changed
tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
// Wait for the remaining time and re-check
tokio::time::sleep(Duration::from_secs_f64(MAX_RUN_TIME + HALF_API_DELAY)).await;
let state = wt_client.lock().unwrap();
assert!(!state.retriers.contains_key(&tower_id));

Expand Down Expand Up @@ -1192,7 +1208,10 @@ mod tests {

{
// After the retriers gives up, it should go idling and flag the tower as unreachable
tokio::time::sleep(Duration::from_secs((MAX_ELAPSED_TIME) as u64)).await;
tokio::time::sleep(Duration::from_secs_f64(
MAX_ELAPSED_TIME as f64 + MAX_RUN_TIME,
))
.await;
let state = wt_client.lock().unwrap();
assert!(state.get_retrier_status(&tower_id).unwrap().is_idle());

Expand All @@ -1212,7 +1231,7 @@ mod tests {
.unwrap();

{
tokio::time::sleep(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs_f64(POLLING_TIME as f64 + MAX_RUN_TIME)).await;
let state = wt_client.lock().unwrap();
assert!(state.get_retrier_status(&tower_id).unwrap().is_idle());
let tower = state.towers.get(&tower_id).unwrap();
Expand Down Expand Up @@ -1261,7 +1280,7 @@ mod tests {
// Send a retry flag to the retrier to force a retry.
tx.send((tower_id, RevocationData::None)).unwrap();

tokio::time::sleep(Duration::from_secs(MAX_ELAPSED_TIME as u64)).await;
tokio::time::sleep(Duration::from_secs_f64(POLLING_TIME as f64 + MAX_RUN_TIME)).await;
// FIXME: Here we should be able to check this, however, due to httpmock limitations, we cannot return a response based on the request.
// Therefore, both requests will be responded with the same data. Given pending_appointments is a HashSet, we cannot even know which request
// will be sent first (sets are initialized with a random state, which decided the order or iteration).
Expand Down

0 comments on commit 0cf3d97

Please sign in to comment.