Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the user-provided SleepFuture for interval checks in background-processor #1962

Merged
merged 3 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ rustdoc-args = ["--cfg", "docsrs"]

[features]
futures = [ "futures-util" ]
std = ["lightning/std", "lightning-rapid-gossip-sync/std"]

default = ["std"]

[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.113", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync" }
bitcoin = { version = "0.29.0", default-features = false }
lightning = { version = "0.0.113", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true }

[dev-dependencies]
Expand Down
90 changes: 61 additions & 29 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]

#[cfg(any(test, feature = "std"))]
extern crate core;

#[macro_use] extern crate lightning;
extern crate lightning_rapid_gossip_sync;

Expand All @@ -28,15 +33,22 @@ use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
use lightning_rapid_gossip_sync::RapidGossipSync;
use lightning::io;

use core::ops::Deref;
use core::time::Duration;

#[cfg(feature = "std")]
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::ops::Deref;
#[cfg(feature = "std")]
use core::sync::atomic::{AtomicBool, Ordering};
wpaulino marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "std")]
use std::thread::{self, JoinHandle};
#[cfg(feature = "std")]
use std::time::Instant;

#[cfg(feature = "futures")]
use futures_util::{select_biased, future::FutureExt};
use futures_util::{select_biased, future::FutureExt, task};

/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
Expand All @@ -62,6 +74,7 @@ use futures_util::{select_biased, future::FutureExt};
///
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::util::events::Event
#[cfg(feature = "std")]
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
pub struct BackgroundProcessor {
stop_thread: Arc<AtomicBool>,
Expand Down Expand Up @@ -207,15 +220,15 @@ macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr)
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
wpaulino marked this conversation as resolved.
Show resolved Hide resolved
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();

let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut have_pruned = false;

loop {
Expand All @@ -237,9 +250,9 @@ macro_rules! define_run_body {

// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let mut await_start = $get_timer(1);
let updates_available = $await;
let await_time = await_start.elapsed();
let await_slow = $timer_elapsed(&mut await_start, 1);

if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
Expand All @@ -251,12 +264,12 @@ macro_rules! define_run_body {
log_trace!($logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
if await_time > Duration::from_secs(1) {
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
Expand All @@ -271,40 +284,46 @@ macro_rules! define_run_body {
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
last_ping_call = $get_timer(PING_TIMER);
} else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
last_ping_call = $get_timer(PING_TIMER);
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
log_trace!($logger, "Pruning and persisting network graph.");
network_graph.remove_stale_channels_and_tracking();
#[cfg(feature = "std")] {
log_trace!($logger, "Pruning and persisting network graph.");
network_graph.remove_stale_channels_and_tracking();
}
#[cfg(not(feature = "std"))] {
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
log_trace!($logger, "Persisting network graph.");
}

if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

last_prune_call = Instant::now();
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
have_pruned = true;
}
}

if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
}
}

Expand Down Expand Up @@ -334,6 +353,11 @@ macro_rules! define_run_body {
/// future which outputs true, the loop will exit and this function's future will complete.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
///
/// Requires the `futures` feature. Note that while this method is available without the `std`
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Expand Down Expand Up @@ -364,13 +388,13 @@ pub async fn process_events_async<
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), std::io::Error>
) -> Result<(), io::Error>
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
Expand Down Expand Up @@ -411,9 +435,15 @@ where
false
}
}
}, |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
let mut waker = task::noop_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
})
}

#[cfg(feature = "std")]
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
Expand Down Expand Up @@ -522,7 +552,8 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down Expand Up @@ -568,13 +599,14 @@ impl BackgroundProcessor {
}
}

#[cfg(feature = "std")]
impl Drop for BackgroundProcessor {
fn drop(&mut self) {
self.stop_and_join_thread().unwrap();
}
}

#[cfg(test)]
#[cfg(all(feature = "std", test))]
mod tests {
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::constants::genesis_block;
Expand Down
1 change: 1 addition & 0 deletions no-std-check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ default = ["lightning/no-std", "lightning-invoice/no-std", "lightning-rapid-goss
lightning = { path = "../lightning", default-features = false }
lightning-invoice = { path = "../lightning-invoice", default-features = false }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-background-processor = { path = "../lightning-background-processor", features = ["futures"], default-features = false }