Skip to content

Commit

Permalink
Merge pull request #1962 from TheBlueMatt/2023-01-bp-no-std
Browse files Browse the repository at this point in the history
Use the user-provided `SleepFuture` for interval checks in `background-processor`
  • Loading branch information
TheBlueMatt authored Jan 17, 2023
2 parents e1b58a8 + aa4c6f6 commit c86950d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 32 deletions.
9 changes: 6 additions & 3 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ rustdoc-args = ["--cfg", "docsrs"]

[features]
futures = [ "futures-util" ]
std = ["lightning/std", "lightning-rapid-gossip-sync/std"]

default = ["std"]

[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.113", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync" }
bitcoin = { version = "0.29.0", default-features = false }
lightning = { version = "0.0.113", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.0.113", path = "../lightning-rapid-gossip-sync", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["async-await-macro"], optional = true }

[dev-dependencies]
Expand Down
90 changes: 61 additions & 29 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]

#[cfg(any(test, feature = "std"))]
extern crate core;

#[macro_use] extern crate lightning;
extern crate lightning_rapid_gossip_sync;

Expand All @@ -28,15 +33,22 @@ use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
use lightning_rapid_gossip_sync::RapidGossipSync;
use lightning::io;

use core::ops::Deref;
use core::time::Duration;

#[cfg(feature = "std")]
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::ops::Deref;
#[cfg(feature = "std")]
use core::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "std")]
use std::thread::{self, JoinHandle};
#[cfg(feature = "std")]
use std::time::Instant;

#[cfg(feature = "futures")]
use futures_util::{select_biased, future::FutureExt};
use futures_util::{select_biased, future::FutureExt, task};

/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
Expand All @@ -62,6 +74,7 @@ use futures_util::{select_biased, future::FutureExt};
///
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::util::events::Event
#[cfg(feature = "std")]
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
pub struct BackgroundProcessor {
stop_thread: Arc<AtomicBool>,
Expand Down Expand Up @@ -207,15 +220,15 @@ macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr)
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();

let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut have_pruned = false;

loop {
Expand All @@ -237,9 +250,9 @@ macro_rules! define_run_body {

// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let mut await_start = $get_timer(1);
let updates_available = $await;
let await_time = await_start.elapsed();
let await_slow = $timer_elapsed(&mut await_start, 1);

if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
Expand All @@ -251,12 +264,12 @@ macro_rules! define_run_body {
log_trace!($logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
if await_time > Duration::from_secs(1) {
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
Expand All @@ -271,40 +284,46 @@ macro_rules! define_run_body {
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
last_ping_call = $get_timer(PING_TIMER);
} else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
last_ping_call = $get_timer(PING_TIMER);
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
log_trace!($logger, "Pruning and persisting network graph.");
network_graph.remove_stale_channels_and_tracking();
#[cfg(feature = "std")] {
log_trace!($logger, "Pruning and persisting network graph.");
network_graph.remove_stale_channels_and_tracking();
}
#[cfg(not(feature = "std"))] {
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
log_trace!($logger, "Persisting network graph.");
}

if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

last_prune_call = Instant::now();
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
have_pruned = true;
}
}

if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
}
}

Expand Down Expand Up @@ -334,6 +353,11 @@ macro_rules! define_run_body {
/// future which outputs true, the loop will exit and this function's future will complete.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
///
/// Requires the `futures` feature. Note that while this method is available without the `std`
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Expand Down Expand Up @@ -364,13 +388,13 @@ pub async fn process_events_async<
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
SleepFuture: core::future::Future<Output = bool>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), std::io::Error>
) -> Result<(), io::Error>
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
Expand Down Expand Up @@ -411,9 +435,15 @@ where
false
}
}
}, |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
let mut waker = task::noop_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
})
}

#[cfg(feature = "std")]
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
Expand Down Expand Up @@ -522,7 +552,8 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down Expand Up @@ -568,13 +599,14 @@ impl BackgroundProcessor {
}
}

#[cfg(feature = "std")]
impl Drop for BackgroundProcessor {
fn drop(&mut self) {
self.stop_and_join_thread().unwrap();
}
}

#[cfg(test)]
#[cfg(all(feature = "std", test))]
mod tests {
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::constants::genesis_block;
Expand Down
1 change: 1 addition & 0 deletions no-std-check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ default = ["lightning/no-std", "lightning-invoice/no-std", "lightning-rapid-goss
lightning = { path = "../lightning", default-features = false }
lightning-invoice = { path = "../lightning-invoice", default-features = false }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-background-processor = { path = "../lightning-background-processor", features = ["futures"], default-features = false }

0 comments on commit c86950d

Please sign in to comment.