Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer storage feature #2943

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
return self.chain_monitor.release_pending_monitor_events();
}

fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec<ChannelId> {
return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id);
}
}

struct KeyProvider {
Expand Down
5 changes: 5 additions & 0 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,11 @@ mod tests {
fn handle_tx_init_rbf(&self, _their_node_id: PublicKey, _msg: &TxInitRbf) {}
fn handle_tx_ack_rbf(&self, _their_node_id: PublicKey, _msg: &TxAckRbf) {}
fn handle_tx_abort(&self, _their_node_id: PublicKey, _msg: &TxAbort) {}
fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &PeerStorageMessage) {}
fn handle_your_peer_storage(
&self, _their_node_id: PublicKey, _msg: &YourPeerStorageMessage,
) {
}
fn peer_disconnected(&self, their_node_id: PublicKey) {
if their_node_id == self.expected_pubkey {
self.disconnected_flag.store(true, Ordering::SeqCst);
Expand Down
24 changes: 22 additions & 2 deletions lightning-types/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
//! (see the [`Trampoline` feature proposal](https://github.com/lightning/bolts/pull/836) for more information).
//! - `DnsResolver` - supports resolving DNS names to TXT DNSSEC proofs for BIP 353 payments
//! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information).
//! - `provide_peer_backup_storage` - Indicates that we offer the capability to store data of our peers
//! (see https://github.com/lightning/bolts/pull/1110 for more info).
//!
//! LDK knows about the following features, but does not support them:
//! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be
Expand Down Expand Up @@ -150,7 +152,7 @@ mod sealed {
// Byte 4
OnionMessages,
// Byte 5
ChannelType | SCIDPrivacy,
ProvidePeerBackupStorage | ChannelType | SCIDPrivacy,
// Byte 6
ZeroConf,
// Byte 7
Expand All @@ -171,7 +173,7 @@ mod sealed {
// Byte 4
OnionMessages,
// Byte 5
ChannelType | SCIDPrivacy,
ProvidePeerBackupStorage | ChannelType | SCIDPrivacy,
// Byte 6
ZeroConf | Keysend,
// Byte 7
Expand Down Expand Up @@ -522,6 +524,16 @@ mod sealed {
supports_onion_messages,
requires_onion_messages
);
define_feature!(
43,
ProvidePeerBackupStorage,
[InitContext, NodeContext],
"Feature flags for `provide_peer_backup_storage`.",
set_provide_peer_backup_storage_optional,
set_provide_peer_backup_storage_required,
supports_provide_peer_storage,
requires_provide_peer_storage
);
define_feature!(
45,
ChannelType,
Expand Down Expand Up @@ -1104,6 +1116,14 @@ mod tests {
assert!(!features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features1.set_provide_peer_backup_storage_required();
assert!(features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features2.set_provide_peer_backup_storage_optional();
assert!(!features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));

features1.set_data_loss_protect_required();
assert!(features1.requires_unknown_bits_from(&features2));
assert!(!features2.requires_unknown_bits_from(&features1));
Expand Down
163 changes: 91 additions & 72 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
}

struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
monitor: ChannelMonitor<ChannelSigner>,
pub(crate) struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
pub(crate) monitor: ChannelMonitor<ChannelSigner>,
/// The full set of pending monitor updates for this Channel.
///
/// Note that this lock must be held from [`ChannelMonitor::update_monitor`] through to
Expand All @@ -181,7 +181,7 @@ struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
/// could cause users to have a full [`ChannelMonitor`] on disk as well as a
/// [`ChannelMonitorUpdate`] which was already applied. While this isn't an issue for the
/// LDK-provided update-based [`Persist`], it is somewhat surprising for users so we avoid it.
pending_monitor_updates: Mutex<Vec<u64>>,
pub(crate) pending_monitor_updates: Mutex<Vec<u64>>,
}

impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
Expand All @@ -195,8 +195,8 @@ impl<ChannelSigner: EcdsaChannelSigner> MonitorHolder<ChannelSigner> {
/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
/// released.
pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
funding_txo: OutPoint,
pub(crate) lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
pub(crate) funding_txo: OutPoint,
}

impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
Expand Down Expand Up @@ -230,6 +230,7 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
P::Target: Persist<ChannelSigner>,
{
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,

chain_source: Option<C>,
broadcaster: T,
logger: L,
Expand All @@ -246,73 +247,19 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
event_notifier: Notifier,
}

impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
/// of a channel and reacting accordingly based on transactions in the given chain data. See
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
/// be returned by [`chain::Watch::release_pending_monitor_events`].
///
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
/// calls must not exclude any transactions matching the new outputs nor any in-block
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
/// updated `txdata`.
///
/// Calls which represent a new blockchain tip height should set `best_height`.
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
pub(crate) fn update_monitor_with_chain_data_util <FN, P: Deref, ChannelSigner, C:Deref, L:Deref>(
persister: &P, chain_source: &Option<C>, logger: &L, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
) -> Result<(), ()>
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
C::Target: chain::Filter,
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
P::Target: Persist<ChannelSigner>,
L::Target: Logger,
ChannelSigner: EcdsaChannelSigner,
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
let channel_count = funding_outpoints.len();
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitor_lock);
let _poison = self.monitors.write().unwrap();
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}

// do some followup cleanup if any funding outpoints were added in between iterations
let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
}

if let Some(height) = best_height {
// If the best block height is being updated, update highest_chain_height under the
// monitors write lock.
let old_height = self.highest_chain_height.load(Ordering::Acquire);
let new_height = height as usize;
if new_height > old_height {
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
}

fn update_monitor_with_chain_data<FN>(
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let logger = WithChannelMonitor::from(logger, &monitor, None);

let mut txn_outputs = process(monitor, txdata);

Expand All @@ -337,7 +284,7 @@ where C::Target: chain::Filter,
// `ChannelMonitorUpdate` after a channel persist for a channel with the same
// `latest_update_id`.
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) {
match persister.update_persisted_channel(*funding_outpoint, None, monitor) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
log_funding_info!(monitor)
Expand All @@ -353,7 +300,7 @@ where C::Target: chain::Filter,

// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
if let Some(ref chain_source_ref) = chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
Expand All @@ -364,13 +311,85 @@ where C::Target: chain::Filter,
script_pubkey: output.script_pubkey,
};
log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
chain_source.register_output(output);
chain_source_ref.register_output(output);
}
}
}
Ok(())
}

pub(crate) fn process_chain_data_util<FN, ChannelSigner: EcdsaChannelSigner, L: Deref, P: Deref, C: Deref>(persister: &P, chain_source: &Option<C>,
logger: &L, monitors: &RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>, highest_chain_height: &AtomicUsize,
header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
C::Target: chain::Filter,
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
let funding_outpoints = hash_set_from_iter(monitors.read().unwrap().keys().cloned());
let channel_count = funding_outpoints.len();
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitor_lock);
let _poison = monitors.write().unwrap();
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
}
}
}

// do some followup cleanup if any funding outpoints were added in between iterations
let monitor_states = monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
}
}
}

if let Some(height) = best_height {
// If the best block height is being updated, update highest_chain_height under the
// monitors write lock.
let old_height = highest_chain_height.load(Ordering::Acquire);
let new_height = height as usize;
if new_height > old_height {
highest_chain_height.store(new_height, Ordering::Release);
}
}
}
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
/// of a channel and reacting accordingly based on transactions in the given chain data. See
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
/// be returned by [`chain::Watch::release_pending_monitor_events`].
///
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
/// calls must not exclude any transactions matching the new outputs nor any in-block
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
/// updated `txdata`.
///
/// Calls which represent a new blockchain tip height should set `best_height`.
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
{
process_chain_data_util(&self.persister, &self.chain_source, &self.logger, &self.monitors, &self.highest_chain_height, header, best_height, txdata, process);
}

/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
///
/// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
Expand Down
Loading
Loading