Skip to content

Commit

Permalink
Revamps the cln-pugin to support retrying towers automatically
Browse files Browse the repository at this point in the history
- Updates `Retrier::run` to return more meaningful errors. `Retrier::run` used to simply return a message,
revamps it to return RetryError variants so we can handle return cases better.

- Adds an additional state to `RetrierStatus`: Idle. Retries that fail due to an accumulation of transient errors will be flagged
as Idle instead of Failed and retried later on (based on `auto_retry_delay`). Notice Retrier data is not kept in memory while a retrier is Idle. Instead, data is cleared and loaded again from the database when the `Retrier` is ready to run again.

- Revamps how revocation data is sent to the `RetryManager`: The RetrierManager used to received locators one by one via unreachable_towers. This is due to them being mainly fed by `on_commitment_revocation`, which generates them one by one. However, both when manually retrying or when bootstrapping from an already populated database, multiple appointments may be pending for the same tower, hence needing to call `unreachable_towers.send` multiple times for the same tower. This itself was not a big deal, given we didn't really needed to differentiate between the cases. We do now though.
In order to implement periodic retries while allowing manual retries we need to be able to signal the state transition to the
`Retrier` without providing any new data:

- If a Retrier is idle and we receive data trough `on_commitment_revocation` we need to append that data to the `Retrier`.
- If a Retrier is iddle and we receive data trough a manual retry, we need to change the state of the `Retrier` without
  adding any new data to it.

In order to implement this we've added an additional map to `WTClient` that reports the state of the active retriers. Retriers are active only if they are running or idle.

- Also reworks `WTClient::set_tower_status` to update the status only if the new one does not match the old one.
This is simply to reduce the boiler plate of having to perform this check in other pats of the plugin codebase.
  • Loading branch information
sr-gi committed Jan 10, 2023
1 parent 909a0da commit 07caee2
Show file tree
Hide file tree
Showing 3 changed files with 534 additions and 172 deletions.
123 changes: 82 additions & 41 deletions watchtower-plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::TryFrom;
use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};

use home::home_dir;
use serde_json::json;
Expand All @@ -21,7 +21,7 @@ use watchtower_plugin::net::http::{
self, post_request, process_post_response, AddAppointmentError, ApiResponse, RequestError,
};
use watchtower_plugin::retrier::RetryManager;
use watchtower_plugin::wt_client::WTClient;
use watchtower_plugin::wt_client::{RevocationData, WTClient};
use watchtower_plugin::TowerStatus;

fn to_cln_error(e: RequestError) -> Error {
Expand All @@ -34,6 +34,27 @@ fn to_cln_error(e: RequestError) -> Error {
e
}

/// Sends fresh data to a retrier as long as is does not exist, or it does and its running.
fn send_to_retrier(state: &MutexGuard<WTClient>, tower_id: TowerId, locator: Locator) {
if if let Some(status) = state.get_retrier_status(&tower_id) {
// A retrier in the retriers map can only be running or idle
status.is_running()
} else {
true
} {
state
.unreachable_towers
.send((tower_id, RevocationData::Fresh(locator)))
.unwrap();
} else {
log::debug!(
"Not sending data to idle retrier ({}, {})",
tower_id,
locator
)
}
}

/// Registers the client to a given tower.
///
/// Accepted tower_id formats:
Expand Down Expand Up @@ -286,38 +307,52 @@ async fn get_tower_info(

/// Triggers a manual retry of a tower, tries to send all pending appointments to it.
///
/// Only works if the tower is unreachable or there's been a subscription error.
/// Only works if the tower is unreachable or there's been a subscription error (and the tower is not already being retried).
async fn retry_tower(
plugin: Plugin<Arc<Mutex<WTClient>>>,
v: serde_json::Value,
) -> Result<serde_json::Value, Error> {
let tower_id = TowerId::try_from(v).map_err(|e| anyhow!(e))?;
let state = plugin.state().lock().unwrap();
if let Some(status) = state.get_tower_status(&tower_id) {
if status.is_temporary_unreachable() {
return Err(anyhow!("{} is already being retried", tower_id));
} else if !status.is_retryable() {
return Err(anyhow!(
"Tower status must be unreachable or have a subscription issue to manually retry",
));
}

for locator in state
.towers
.get(&tower_id)
.unwrap()
.pending_appointments
.iter()
{
if let Some(tower_status) = state.get_tower_status(&tower_id) {
if let Some(retrier_status) = state.retriers.get(&tower_id) {
if retrier_status.is_idle() {
// We don't send any associated data in this case given the idle retrier already has it all.
state
.unreachable_towers
.send((tower_id, RevocationData::None))
.map_err(|e| anyhow!(e))?;
} else {
// Status can only be running or idle for data in the retriers map.
return Err(anyhow!("{} is already being retried", tower_id));
}
} else if tower_status.is_retryable() {
// We do send associated data here given there is no retrier associated to this tower.
state
.unreachable_towers
.send((tower_id, *locator))
.send((
tower_id,
RevocationData::Stale(
state
.towers
.get(&tower_id)
.unwrap()
.pending_appointments
.iter()
.cloned()
.collect(),
),
))
.map_err(|e| anyhow!(e))?;
} else {
return Err(anyhow!(
"Tower status must be unreachable or have a subscription issue to manually retry",
));
}
Ok(json!(format!("Retrying {}", tower_id)))
} else {
Err(anyhow!("Unknown tower {}", tower_id))
return Err(anyhow!("Unknown tower {}", tower_id));
}
Ok(json!(format!("Retrying {}", tower_id)))
}

/// Forgets about a tower wiping out all local data associated to it.
Expand Down Expand Up @@ -410,11 +445,7 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.set_tower_status(tower_id, TowerStatus::TemporaryUnreachable);
state.add_pending_appointment(tower_id, &appointment);

state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
send_to_retrier(&state, tower_id, appointment.locator);
}
}
AddAppointmentError::ApiError(e) => match e.error_code {
Expand All @@ -427,11 +458,7 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.set_tower_status(tower_id, TowerStatus::SubscriptionError);
state.add_pending_appointment(tower_id, &appointment);

state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
send_to_retrier(&state, tower_id, appointment.locator);
}

_ => {
Expand Down Expand Up @@ -482,11 +509,8 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.add_pending_appointment(tower_id, &appointment);

if status.is_temporary_unreachable() {
state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
if !status.is_unreachable() {
send_to_retrier(&state, tower_id, appointment.locator);
}
}
}
Expand Down Expand Up @@ -517,7 +541,11 @@ async fn main() -> Result<(), Error> {
"watchtower-proxy",
Value::OptString,
"Socks v5 proxy IP address and port for the watchtower client",
))
)).option(ConfigOption::new(
"watchtower-auto-retry-delay",
Value::Integer(86400),
"the time (in seconds) that a retrier will wait before auto-retrying a failed tower. Defaults to once a day",
))
.option(ConfigOption::new(
"dev-watchtower-max-retry-interval",
Value::Integer(60),
Expand Down Expand Up @@ -593,6 +621,13 @@ async fn main() -> Result<(), Error> {
// We will never end up here, but we need to define an else. Should be fixed alongside the previous fixme.
900
};
let auto_retry_delay =
if let Value::Integer(x) = midstate.option("watchtower-auto-retry-delay").unwrap() {
x as u16
} else {
// We will never end up here, but we need to define an else. Should be fixed alongside the previous fixme.
3600
};
let max_interval_time = if let Value::Integer(x) = midstate
.option("dev-watchtower-max-retry-interval")
.unwrap()
Expand All @@ -605,9 +640,15 @@ async fn main() -> Result<(), Error> {

let plugin = midstate.start(wt_client.clone()).await?;
tokio::spawn(async move {
RetryManager::new(wt_client, rx, max_elapsed_time, max_interval_time)
.manage_retry()
.await
RetryManager::new(
wt_client,
rx,
max_elapsed_time,
auto_retry_delay,
max_interval_time,
)
.manage_retry()
.await
});
plugin.join().await
}
Loading

0 comments on commit 07caee2

Please sign in to comment.