From f9586572c8560bf677fcddf53e5439c60d2fb939 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 01:04:05 +0530 Subject: [PATCH 01/11] lightning: Add provide_peer_backup_storage to InitContext and NodeContext. --- lightning-types/src/features.rs | 24 ++++++++++++++++++++++-- lightning/src/ln/channelmanager.rs | 1 + 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lightning-types/src/features.rs b/lightning-types/src/features.rs index 036ac4e84ba..5749e8a318b 100644 --- a/lightning-types/src/features.rs +++ b/lightning-types/src/features.rs @@ -70,6 +70,8 @@ //! (see the [`Trampoline` feature proposal](https://github.com/lightning/bolts/pull/836) for more information). //! - `DnsResolver` - supports resolving DNS names to TXT DNSSEC proofs for BIP 353 payments //! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information). +//! - `provide_peer_backup_storage` - Indicates that we offer the capability to store data of our peers +//! (see https://github.com/lightning/bolts/pull/1110 for more info). //! //! LDK knows about the following features, but does not support them: //! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be @@ -150,7 +152,7 @@ mod sealed { // Byte 4 OnionMessages, // Byte 5 - ChannelType | SCIDPrivacy, + ProvidePeerBackupStorage | ChannelType | SCIDPrivacy, // Byte 6 ZeroConf, // Byte 7 @@ -171,7 +173,7 @@ mod sealed { // Byte 4 OnionMessages, // Byte 5 - ChannelType | SCIDPrivacy, + ProvidePeerBackupStorage | ChannelType | SCIDPrivacy, // Byte 6 ZeroConf | Keysend, // Byte 7 @@ -522,6 +524,16 @@ mod sealed { supports_onion_messages, requires_onion_messages ); + define_feature!( + 43, + ProvidePeerBackupStorage, + [InitContext, NodeContext], + "Feature flags for `provide_peer_backup_storage`.", + set_provide_peer_backup_storage_optional, + set_provide_peer_backup_storage_required, + supports_provide_peer_storage, + requires_provide_peer_storage + ); define_feature!( 45, ChannelType, @@ -1104,6 +1116,14 @@ mod tests { assert!(!features1.requires_unknown_bits_from(&features2)); assert!(!features2.requires_unknown_bits_from(&features1)); + features1.set_provide_peer_backup_storage_required(); + assert!(features1.requires_unknown_bits_from(&features2)); + assert!(!features2.requires_unknown_bits_from(&features1)); + + features2.set_provide_peer_backup_storage_optional(); + assert!(!features1.requires_unknown_bits_from(&features2)); + assert!(!features2.requires_unknown_bits_from(&features1)); + features1.set_data_loss_protect_required(); assert!(features1.requires_unknown_bits_from(&features2)); assert!(!features2.requires_unknown_bits_from(&features1)); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index bdf67b5da7c..2fea97c3fd6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -11415,6 +11415,7 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures { features.set_scid_privacy_optional(); features.set_zero_conf_optional(); features.set_route_blinding_optional(); + features.set_provide_peer_backup_storage_optional(); if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx { features.set_anchors_zero_fee_htlc_tx_optional(); } From ad133980cbe01be33096740acb238d5e98de031f Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 01:26:17 +0530 Subject: [PATCH 02/11] lightning: Add message types and their handlers for peer storage messages. --- lightning-net-tokio/src/lib.rs | 5 +++ lightning/src/events/mod.rs | 17 ++++++++ lightning/src/ln/channelmanager.rs | 10 +++++ lightning/src/ln/functional_test_utils.rs | 6 +++ lightning/src/ln/msgs.rs | 52 +++++++++++++++++++++++ lightning/src/ln/peer_handler.rs | 16 +++++++ lightning/src/ln/wire.rs | 20 +++++++++ lightning/src/util/test_utils.rs | 8 ++++ 8 files changed, 134 insertions(+) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 89ac7a52ec2..e1d52f12f51 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -760,6 +760,11 @@ mod tests { fn handle_tx_init_rbf(&self, _their_node_id: PublicKey, _msg: &TxInitRbf) {} fn handle_tx_ack_rbf(&self, _their_node_id: PublicKey, _msg: &TxAckRbf) {} fn handle_tx_abort(&self, _their_node_id: PublicKey, _msg: &TxAbort) {} + fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &PeerStorageMessage) {} + fn handle_your_peer_storage( + &self, _their_node_id: PublicKey, _msg: &YourPeerStorageMessage, + ) { + } fn peer_disconnected(&self, their_node_id: PublicKey) { if their_node_id == self.expected_pubkey { self.disconnected_flag.store(true, Ordering::SeqCst); diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index 4a179f43985..24748a096b3 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -2574,6 +2574,23 @@ pub enum MessageSendEvent { /// The gossip_timestamp_filter which should be sent. msg: msgs::GossipTimestampFilter, }, + /// Sends a channel partner Peer Storage of our backup which they should store. + /// This should be sent on each new connection to the channel partner or whenever we want + /// them to update the backup that they store. + SendPeerStorageMessage { + /// The node_id of this message recipient + node_id: PublicKey, + /// The PeerStorageMessage which should be sent. + msg: msgs::PeerStorageMessage, + }, + /// Sends a channel partner their own peer storage which we store and update when they send + /// a [`msgs::PeerStorageMessage`]. + SendYourPeerStorageMessage { + /// The node_id of this message recipient + node_id: PublicKey, + /// The YourPeerStorageMessage which should be sent. + msg: msgs::YourPeerStorageMessage, + } } /// A trait indicating an object may generate message send events diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2fea97c3fd6..32cf8412c8c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -10471,6 +10471,12 @@ where let _ = handle_error!(self, self.internal_funding_signed(&counterparty_node_id, msg), counterparty_node_id); } + fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::PeerStorageMessage) { + } + + fn handle_your_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage) { + } + fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) { // Note that we never need to persist the updated ChannelManager for an inbound // channel_ready message - while the channel's state will change, any channel_ready message @@ -10736,6 +10742,10 @@ where &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + + // Peer Storage + &events::MessageSendEvent::SendPeerStorageMessage { .. } => false, + &events::MessageSendEvent::SendYourPeerStorageMessage { .. } => false, } }); debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect"); diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 7776966b285..6ae52d736d6 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -854,6 +854,12 @@ macro_rules! get_htlc_update_msgs { /// such messages are intended to all peers. pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &mut Vec) -> MessageSendEvent { let ev_index = msg_events.iter().position(|e| { match e { + MessageSendEvent::SendPeerStorageMessage { node_id, .. } => { + node_id == msg_node_id + }, + MessageSendEvent::SendYourPeerStorageMessage { node_id, .. } => { + node_id == msg_node_id + }, MessageSendEvent::SendAcceptChannel { node_id, .. } => { node_id == msg_node_id }, diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index be5ecb27ae0..b56c65014cc 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -723,6 +723,24 @@ pub struct UpdateFulfillHTLC { pub payment_preimage: PaymentPreimage, } +/// A [`PeerStorage`] message to be sent to or received from a peer. +/// +/// [`PeerStorage`]: https://github.com/lightning/bolts/pull/1110 +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct PeerStorageMessage { + /// Data included in the msg + pub data: Vec, +} + +/// An [`YourPeerStorage`] message to be sent to or received from a peer. +/// +/// [`YourPeerStorage`]: https://github.com/lightning/bolts/pull/1110 +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct YourPeerStorageMessage { + /// Data included in the msg + pub data: Vec, +} + /// An [`update_fail_htlc`] message to be sent to or received from a peer. /// /// [`update_fail_htlc`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#removing-an-htlc-update_fulfill_htlc-update_fail_htlc-and-update_fail_malformed_htlc @@ -1505,6 +1523,12 @@ pub trait ChannelMessageHandler : MessageSendEventsProvider { /// Handle an incoming `channel_ready` message from the given peer. fn handle_channel_ready(&self, their_node_id: PublicKey, msg: &ChannelReady); + // Peer Storage + /// Handle an incoming `peer_storage` message from the given peer. + fn handle_peer_storage(&self, their_node_id: PublicKey, msg: &PeerStorageMessage); + /// Handle an incoming `your_peer_storage` message from the given peer. + fn handle_your_peer_storage(&self, their_node_id: PublicKey, msg: &YourPeerStorageMessage); + // Channel close: /// Handle an incoming `shutdown` message from the given peer. fn handle_shutdown(&self, their_node_id: PublicKey, msg: &Shutdown); @@ -2597,6 +2621,14 @@ impl_writeable_msg!(UpdateFulfillHTLC, { payment_preimage }, {}); +impl_writeable_msg!(PeerStorageMessage, { + data +}, {}); + +impl_writeable_msg!(YourPeerStorageMessage, { + data +}, {}); + // Note that this is written as a part of ChannelManager objects, and thus cannot change its // serialization format in a way which assumes we know the total serialized length/message end // position. @@ -4447,6 +4479,26 @@ mod tests { assert_eq!(encoded_value, target_value); } + #[test] + fn encoding_peer_storage() { + let peerstorage = msgs::PeerStorageMessage { + data: >::from_hex("01020304050607080910").unwrap() + }; + let encoded_value = peerstorage.encode(); + let target_value = >::from_hex("000a01020304050607080910").unwrap(); + assert_eq!(encoded_value, target_value); + } + + #[test] + fn encoding_your_peer_storage() { + let yourpeerstorage = msgs::YourPeerStorageMessage { + data: >::from_hex("01020304050607080910").unwrap() + }; + let encoded_value = yourpeerstorage.encode(); + let target_value = >::from_hex("000a01020304050607080910").unwrap(); + assert_eq!(encoded_value, target_value); + } + #[test] fn encoding_pong() { let pong = msgs::Pong { diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 64c35835bda..8d66d7e62fe 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -323,6 +323,8 @@ impl ChannelMessageHandler for ErroringMessageHandler { } // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them. fn handle_channel_update(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelUpdate) {} + fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &msgs::PeerStorageMessage) {} + fn handle_your_peer_storage(&self, _their_node_id: PublicKey, _msg: &msgs::YourPeerStorageMessage) {} fn peer_disconnected(&self, _their_node_id: PublicKey) {} fn peer_connected(&self, _their_node_id: PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_error(&self, _their_node_id: PublicKey, _msg: &msgs::ErrorMessage) {} @@ -1813,6 +1815,12 @@ impl { self.message_handler.chan_handler.handle_channel_ready(their_node_id, &msg); }, + wire::Message::PeerStorageMessage(msg) => { + self.message_handler.chan_handler.handle_peer_storage(their_node_id, &msg); + }, + wire::Message::YourPeerStorageMessage(msg) => { + self.message_handler.chan_handler.handle_your_peer_storage(their_node_id, &msg); + }, // Quiescence messages: wire::Message::Stfu(msg) => { @@ -2108,6 +2116,14 @@ impl { + log_debug!(self.logger, "Handling SendPeerStorageMessage event in peer_handler for {}", log_pubkey!(node_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendYourPeerStorageMessage { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendYourPeerStorageMessage event in peer_handler for {}", log_pubkey!(node_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index 4cf5e21c173..598b9253876 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -53,6 +53,8 @@ pub(crate) enum Message where T: core::fmt::Debug + Type + TestEq { Warning(msgs::WarningMessage), Ping(msgs::Ping), Pong(msgs::Pong), + PeerStorageMessage(msgs::PeerStorageMessage), + YourPeerStorageMessage(msgs::YourPeerStorageMessage), OpenChannel(msgs::OpenChannel), OpenChannelV2(msgs::OpenChannelV2), AcceptChannel(msgs::AcceptChannel), @@ -111,6 +113,8 @@ impl Writeable for Message where T: core::fmt::Debug + Type + TestEq { &Message::Warning(ref msg) => msg.write(writer), &Message::Ping(ref msg) => msg.write(writer), &Message::Pong(ref msg) => msg.write(writer), + &Message::PeerStorageMessage(ref msg) => msg.write(writer), + &Message::YourPeerStorageMessage(ref msg) => msg.write(writer), &Message::OpenChannel(ref msg) => msg.write(writer), &Message::OpenChannelV2(ref msg) => msg.write(writer), &Message::AcceptChannel(ref msg) => msg.write(writer), @@ -169,6 +173,8 @@ impl Type for Message where T: core::fmt::Debug + Type + TestEq { &Message::Warning(ref msg) => msg.type_id(), &Message::Ping(ref msg) => msg.type_id(), &Message::Pong(ref msg) => msg.type_id(), + &Message::PeerStorageMessage(ref msg) => msg.type_id(), + &Message::YourPeerStorageMessage(ref msg) => msg.type_id(), &Message::OpenChannel(ref msg) => msg.type_id(), &Message::OpenChannelV2(ref msg) => msg.type_id(), &Message::AcceptChannel(ref msg) => msg.type_id(), @@ -261,6 +267,12 @@ fn do_read(buffer: &mut R, message_type: u1 msgs::Pong::TYPE => { Ok(Message::Pong(Readable::read(buffer)?)) }, + msgs::PeerStorageMessage::TYPE => { + Ok(Message::PeerStorageMessage(Readable::read(buffer)?)) + }, + msgs::YourPeerStorageMessage::TYPE => { + Ok(Message::YourPeerStorageMessage(Readable::read(buffer)?)) + }, msgs::OpenChannel::TYPE => { Ok(Message::OpenChannel(Readable::read(buffer)?)) }, @@ -625,6 +637,14 @@ impl Encode for msgs::GossipTimestampFilter { const TYPE: u16 = 265; } +impl Encode for msgs::PeerStorageMessage { + const TYPE: u16 = 7; +} + +impl Encode for msgs::YourPeerStorageMessage { + const TYPE: u16 = 9; +} + #[cfg(test)] mod tests { use super::*; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 12e027d32fc..9f544d94520 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -917,6 +917,14 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler { self.received_msg(wire::Message::TxAbort(msg.clone())); } + fn handle_peer_storage(&self, _their_node_id: PublicKey, msg: &msgs::PeerStorageMessage) { + self.received_msg(wire::Message::PeerStorageMessage(msg.clone())); + } + + fn handle_your_peer_storage(&self, _their_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage) { + self.received_msg(wire::Message::YourPeerStorageMessage(msg.clone())); + } + fn message_received(&self) {} } From 6189a7f1987b7adbf3274283197359a36315aae2 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 02:51:50 +0530 Subject: [PATCH 03/11] lightning: Handle peer storage message, it's persistance and send it to the respective peer upon reconnection. --- lightning/src/ln/channelmanager.rs | 85 +++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 2 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 32cf8412c8c..4ab6f82ae21 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1168,9 +1168,23 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. pub is_connected: bool, + peer_storage: Vec, } impl PeerState where SP::Target: SignerProvider { + pub fn new(features: &InitFeatures) -> Self { + Self { + channel_by_id: new_hash_map(), + inbound_channel_request_by_id: new_hash_map(), + latest_features: features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + peer_storage: Vec::new(), + } + } /// Indicates that a peer meets the criteria where we're ok to remove it from our storage. /// If true is passed for `require_disconnected`, the function will return false if we haven't /// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`. @@ -2431,7 +2445,7 @@ where entropy_source: ES, node_signer: NS, signer_provider: SP, - + our_peer_storage: FairRwLock, logger: L, } @@ -3249,7 +3263,7 @@ where entropy_source, node_signer, signer_provider, - + our_peer_storage: FairRwLock::new(OurPeerStorage::new()), logger, } } @@ -7001,6 +7015,7 @@ where monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, + peer_storage: Vec::new(), })); let mut peer_state = peer_state_mutex.lock().unwrap(); @@ -7861,6 +7876,42 @@ where } } + fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = match per_peer_state.get(counterparty_node_id) { + Some(peer_state_mutex) => peer_state_mutex, + None => return, + }; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None); + + // Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with). + if !peer_state.channel_by_id.values().any(|phase| matches!(phase, ChannelPhase::Funded(_))) { + log_debug!(logger, "We do not have any channel with {}", log_pubkey!(counterparty_node_id)); + return; + } + + #[cfg(not(test))] + if msg.data.len() > 1024 { + log_debug!(logger, "We do not allow more than 1 KiB of data for each peer in peer storage. Sending warning to peer {}", log_pubkey!(counterparty_node_id)); + peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: counterparty_node_id.clone(), + action: msgs::ErrorAction::SendWarningMessage { + msg: msgs::WarningMessage { + channel_id: ChannelId([0; 32]), + data: "Supports only data up to 1 KiB in peer storage.".to_owned() + }, + log_level: Level::Trace, + } + }); + return; + } + + log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id)); + peer_state.peer_storage = msg.data.clone(); + } + fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let best_block = *self.best_block.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -10472,6 +10523,8 @@ where } fn handle_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::PeerStorageMessage) { + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents); + self.internal_peer_storage(&counterparty_node_id, msg); } fn handle_your_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage) { @@ -10797,6 +10850,7 @@ where monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, + peer_storage: Vec::new(), })); }, hash_map::Entry::Occupied(e) => { @@ -10826,6 +10880,16 @@ where let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + if !peer_state.peer_storage.is_empty() { + pending_msg_events.push(events::MessageSendEvent::SendYourPeerStorageMessage { + node_id: counterparty_node_id.clone(), + msg: msgs::YourPeerStorageMessage { + data: peer_state.peer_storage.clone() + }, + }); + } + + for (_, phase) in peer_state.channel_by_id.iter_mut() { match phase { ChannelPhase::Funded(chan) => { @@ -11911,6 +11975,12 @@ where if !peer_state.ok_to_remove(false) { peer_pubkey.write(writer)?; peer_state.latest_features.write(writer)?; + + (peer_state.peer_storage.len() as u64).write(writer)?; + for p in peer_state.peer_storage.iter() { + p.write(writer)?; + } + if !peer_state.monitor_update_blocked_actions.is_empty() { monitor_update_blocked_actions_per_peer .get_or_insert_with(Vec::new) @@ -12426,6 +12496,7 @@ where monitor_update_blocked_actions: BTreeMap::new(), actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, + peer_storage: Vec::new(), } }; @@ -12436,6 +12507,15 @@ where let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map()); let mut peer_state = peer_state_from_chans(peer_chans); peer_state.latest_features = Readable::read(reader)?; + + let peer_storage_count:u64 = Readable::read(reader)?; + let mut peer_storage: Vec = Vec::with_capacity(cmp::min(peer_storage_count as usize, MAX_ALLOC_SIZE/mem::size_of::())); + for i in 0..peer_storage_count { + let x = Readable::read(reader)?; + peer_storage.insert(i as usize, x); + } + peer_state.peer_storage = peer_storage; + per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -13090,6 +13170,7 @@ where last_days_feerates: Mutex::new(VecDeque::new()), + our_peer_storage: FairRwLock::new(our_peer_storage), logger: args.logger, default_configuration: args.default_config, }; From bab7fcd05d12f2413c57959a2f8dd1921621511e Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:14:42 +0530 Subject: [PATCH 04/11] lightning: Add struct for building OurPeerStorage, it gets updated everytime a LatestCounterpartyCommitmentTxn update is sent. It would be encrypted and sent to our peers. --- lightning/src/chain/channelmonitor.rs | 15 ++ lightning/src/ln/channel.rs | 14 +- lightning/src/ln/channelmanager.rs | 60 ++++++- lightning/src/ln/mod.rs | 2 + lightning/src/ln/our_peer_storage.rs | 229 ++++++++++++++++++++++++++ 5 files changed, 316 insertions(+), 4 deletions(-) create mode 100644 lightning/src/ln/our_peer_storage.rs diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 86f0d3de5ed..3ec013a076b 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -38,6 +38,8 @@ use crate::ln::msgs::DecodeError; use crate::ln::channel_keys::{DelayedPaymentKey, DelayedPaymentBasepoint, HtlcBasepoint, HtlcKey, RevocationKey, RevocationBasepoint}; use crate::ln::chan_utils::{self,CommitmentTransaction, CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCClaim, ChannelTransactionParameters, HolderCommitmentTransaction, TxCreationKeys}; use crate::ln::channelmanager::{HTLCSource, SentHTLCId}; +use crate::ln::our_peer_storage::StubChannelMonitor; +use crate::ln::features::ChannelTypeFeatures; use crate::chain; use crate::chain::{BestBlock, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator}; @@ -1532,6 +1534,19 @@ impl ChannelMonitor { self.inner.lock().unwrap().get_latest_update_id() } + /// Gets the latest claiming info from the ChannelMonitor to update our PeerStorageBackup. + pub(crate) fn get_latest_commitment_txn_and_its_claiming_info(&self) -> Option<(Txid, Vec<(HTLCOutputInCommitment, Option>)>, Option<(u64, PublicKey, Option)>)> { + let lock = self.inner.lock().unwrap(); + if let Some(latest_txid) = lock.current_counterparty_commitment_txid { + return Some(( + latest_txid, lock.counterparty_claimable_outpoints.get(&latest_txid).unwrap().clone(), + lock.their_cur_per_commitment_points + )) + } + + None + } + /// Gets the funding transaction outpoint of the channel this ChannelMonitor is monitoring for. pub fn get_funding_txo(&self) -> (OutPoint, ScriptBuf) { self.inner.lock().unwrap().get_funding_txo().clone() diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 250704ea2a7..699cec4cd1c 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -2074,6 +2074,18 @@ impl ChannelContext where SP::Target: SignerProvider { self.update_time_counter } + pub fn get_commitment_secret(&self) -> CounterpartyCommitmentSecrets { + self.commitment_secrets.clone() + } + + pub fn get_channel_keys_id(&self) -> [u8;32] { + self.channel_keys_id + } + + pub fn get_commitment_txn_number_obscure_factor(&self) -> u64 { + get_commitment_transaction_number_obscure_factor(&self.get_holder_pubkeys().payment_point, &self.get_counterparty_pubkeys().payment_point, self.is_outbound()) + } + pub fn get_latest_monitor_update_id(&self) -> u64 { self.latest_monitor_update_id } @@ -2372,7 +2384,7 @@ impl ChannelContext where SP::Target: SignerProvider { height.checked_sub(self.funding_tx_confirmation_height).map_or(0, |c| c + 1) } - fn get_holder_selected_contest_delay(&self) -> u16 { + pub fn get_holder_selected_contest_delay(&self) -> u16 { self.channel_transaction_parameters.holder_selected_contest_delay } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4ab6f82ae21..c7adfe137c3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -40,13 +40,14 @@ use crate::blinded_path::payment::{BlindedPaymentPath, Bolt12OfferContext, Bolt1 use crate::chain; use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock}; use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator}; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLOSED_CHANNEL_UPDATE_ID, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, STUB_CHANNEL_UPDATE_IDENTIFIER}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::events; use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::inbound_payment; +use crate::ln::our_peer_storage::{OurPeerStorage, StubChannelMonitor}; use crate::ln::types::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret}; use crate::ln::channel::{self, Channel, ChannelPhase, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel, WithChannelContext}; use crate::ln::channel_state::ChannelDetails; @@ -6876,6 +6877,8 @@ where if let Some(raa_blocker) = raa_blocker_opt { peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); } + + let _ = self.our_peer_storage.write().unwrap().update_state_from_monitor_update(chan.context.channel_id(), monitor_update.clone()); if !during_init { handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); @@ -8015,7 +8018,18 @@ where let mut pending_events = self.pending_events.lock().unwrap(); emit_channel_ready_event!(pending_events, chan); } - + // Update Peer Storage. + let counterparty_channel_parameters = chan.context.channel_transaction_parameters.counterparty_parameters.as_ref().unwrap(); + let counterparty_delayed_payment_base_key = counterparty_channel_parameters.pubkeys.delayed_payment_basepoint; + let counterparty_htlc_base_key = counterparty_channel_parameters.pubkeys.htlc_basepoint; + let stub_chan = StubChannelMonitor::new(chan.context.channel_id(), chan.context.get_funding_txo().unwrap(), chan.context.get_value_satoshis(), + chan.context.get_channel_keys_id(), chan.context.get_commitment_secret(), + chan.context.get_counterparty_node_id(), counterparty_delayed_payment_base_key, counterparty_htlc_base_key, + chan.context.get_holder_selected_contest_delay(), + chan.context.get_commitment_txn_number_obscure_factor(), None, + None, chan.context.channel_transaction_parameters.channel_type_features.clone(), + self.current_best_block()); + self.our_peer_storage.write().unwrap().stub_channel(stub_chan); Ok(()) } else { try_chan_phase_entry!(self, Err(ChannelError::close( @@ -8363,6 +8377,7 @@ where let funding_txo = chan.context.get_funding_txo(); let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &&logger), chan_phase_entry); if let Some(monitor_update) = monitor_update_opt { + let _ = self.our_peer_storage.write().unwrap().update_state_from_monitor_update(chan.context.channel_id(), monitor_update.clone()); handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan); } @@ -8563,9 +8578,14 @@ where } else { false }; let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self, chan.revoke_and_ack(&msg, &self.fee_estimator, &&logger, mon_update_blocked), chan_phase_entry); + + let mut our_peer_storage = self.our_peer_storage.write().unwrap(); if let Some(monitor_update) = monitor_update_opt { let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); + + let _ = our_peer_storage.update_state_from_monitor_update(chan.context.channel_id(), monitor_update.clone()); + handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); } @@ -8909,6 +8929,7 @@ where } if let Some(monitor_update) = monitor_opt { has_monitor_update = true; + let _ = self.our_peer_storage.write().unwrap().update_state_from_monitor_update(chan.context.channel_id(), monitor_update.clone()); handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, peer_state, per_peer_state, chan); @@ -10146,7 +10167,7 @@ where } channel.best_block_updated(height, header.time, self.chain_hash, &self.node_signer, &self.default_configuration, &&WithChannelContext::from(&self.logger, &channel.context, None)) }); - + self.our_peer_storage.write().unwrap().update_best_block(header, height); macro_rules! max_time { ($timestamp: expr) => { loop { @@ -12327,6 +12348,8 @@ where let mut channel_closures = VecDeque::new(); let mut close_background_events = Vec::new(); let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize); + let mut our_peer_storage: OurPeerStorage = OurPeerStorage::new(); + for _ in 0..channel_count { let mut channel: Channel = Channel::read(reader, ( &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config) @@ -12335,7 +12358,38 @@ where let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id()); funding_txo_set.insert(funding_txo.clone()); + let counterparty_channel_parameters = channel.context.channel_transaction_parameters.counterparty_parameters.as_ref().unwrap(); + let counterparty_delayed_payment_base_key = counterparty_channel_parameters.pubkeys.delayed_payment_basepoint; + let counterparty_htlc_base_key = counterparty_channel_parameters.pubkeys.htlc_basepoint; + + let stub_chan = StubChannelMonitor::new( + channel.context.channel_id(), + funding_txo, + channel.context.get_value_satoshis(), + channel.context.get_channel_keys_id(), + channel.context.get_commitment_secret(), + channel.context.get_counterparty_node_id(), + counterparty_delayed_payment_base_key, + counterparty_htlc_base_key, + channel.context.get_holder_selected_contest_delay(), + channel.context.get_commitment_txn_number_obscure_factor(), + None, + None, + channel.context.channel_transaction_parameters.channel_type_features.clone(), + BestBlock::new(best_block_hash.clone(), best_block_height.clone()), + ); + our_peer_storage.stub_channel(stub_chan); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { + if let Some(latest_commitment_txn_info) = monitor.get_latest_commitment_txn_and_its_claiming_info() { + + our_peer_storage.update_latest_state(monitor.channel_id(), latest_commitment_txn_info.0, latest_commitment_txn_info.2); + } + + if monitor.get_latest_update_id() == STUB_CHANNEL_UPDATE_IDENTIFIER { + log_error!(logger, "ChannelMonitor for {} is stale and recovered from Peer Storage, it is not safe to run the node in normal mode.", monitor.channel_id()); + return Err(DecodeError::DangerousValue); + } + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index dceb52ab4ae..2e986579c63 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -30,6 +30,8 @@ pub mod chan_utils; pub mod features; pub mod script; pub mod types; +pub mod fundrecoverer; +pub mod our_peer_storage; // TODO: These modules were moved from lightning-invoice and need to be better integrated into this // crate now: diff --git a/lightning/src/ln/our_peer_storage.rs b/lightning/src/ln/our_peer_storage.rs new file mode 100644 index 00000000000..3aaf61f6056 --- /dev/null +++ b/lightning/src/ln/our_peer_storage.rs @@ -0,0 +1,229 @@ +use crate::chain::BestBlock; +use crate::ln::types::ChannelId; +use bitcoin::hash_types::Txid; +use bitcoin::secp256k1::PublicKey; +use bitcoin::block::Header; + +use crate::chain::channelmonitor::{ + ChannelMonitorUpdate, ChannelMonitorUpdateStep, +}; +use crate::crypto::chacha20poly1305rfc::ChaCha20Poly1305RFC; + +use crate::util::ser::{ Writeable, VecWriter }; + +use crate::prelude::*; +use crate::chain::transaction::OutPoint; +use crate::ln::chan_utils::CounterpartyCommitmentSecrets; +use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint}; +use crate::ln::features::{ChannelTypeFeatures}; + + +/// [StubChannelMonitor] is the smallest unit of [OurPeerStorage], it contains +/// information about a single channel using which we can recover on-chain funds. +#[derive(Clone, PartialEq, Eq)] +pub struct StubChannelMonitor { + pub(crate) channel_id: ChannelId, + pub(crate) funding_outpoint: OutPoint, + pub(crate) channel_value_stoshis: u64, + pub(crate) channel_keys_id: [u8;32], + pub(crate) commitment_secrets: CounterpartyCommitmentSecrets, + pub(crate) counterparty_node_id: PublicKey, + pub(crate) counterparty_delayed_payment_base_key: DelayedPaymentBasepoint, + pub(crate) counterparty_htlc_base_key: HtlcBasepoint, + pub(crate) on_counterparty_tx_csv: u16, + pub(crate) obscure_factor: u64, + pub(crate) latest_state: Option, + pub(crate) their_cur_per_commitment_points: Option<(u64, PublicKey, Option)>, + pub(crate) features: ChannelTypeFeatures, + pub(crate) best_block: BestBlock, +} + +impl StubChannelMonitor { + pub(crate) fn new(channel_id: ChannelId, funding_outpoint: OutPoint, channel_value_stoshis: u64, channel_keys_id: [u8; 32], + commitment_secrets: CounterpartyCommitmentSecrets, counterparty_node_id: PublicKey, counterparty_delayed_payment_base_key: DelayedPaymentBasepoint, counterparty_htlc_base_key: HtlcBasepoint, on_counterparty_tx_csv: u16, + obscure_factor: u64, latest_state: Option, their_cur_per_commitment_points: Option<(u64, PublicKey, Option)>, + features: ChannelTypeFeatures, best_block: BestBlock) -> Self { + StubChannelMonitor { + channel_id, + funding_outpoint, + channel_value_stoshis, + channel_keys_id, + commitment_secrets, + counterparty_node_id, + counterparty_delayed_payment_base_key, + counterparty_htlc_base_key, + on_counterparty_tx_csv, + obscure_factor, + latest_state, + their_cur_per_commitment_points, + features, + best_block, + } + } + + /// Get the min seen secret from the commitment secrets. + pub fn get_min_seen_secret(&self) -> u64 { + return self.commitment_secrets.get_min_seen_secret(); + } +} + +impl_writeable_tlv_based!(StubChannelMonitor, { + (0, channel_id, required), + (2, channel_keys_id, required), + (4, channel_value_stoshis, required), + (6, funding_outpoint, required), + (8, commitment_secrets, required), + (10, counterparty_node_id, required), + (12, counterparty_delayed_payment_base_key, required), + (14, counterparty_htlc_base_key, required), + (16, on_counterparty_tx_csv, required), + (18, obscure_factor, required), + (20, latest_state, required), + (22, their_cur_per_commitment_points, option), + (24, features, required), + (26, best_block, required), +}); + + +/// [`OurPeerStorage`] is used to store our channels using which we +/// can create our PeerStorage Backup. +/// This includes timestamp to compare between two given +/// [`OurPeerStorage`] and version defines the structure. +#[derive(Clone, PartialEq, Eq)] +pub struct OurPeerStorage { + version: u32, + timestamp: u32, + channels: Vec, +} + +impl OurPeerStorage { + /// Returns a [`OurPeerStorage`] with version 1 and current timestamp. + pub fn new() -> Self { + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time must be > 1970"); + + Self { + version: 1, + timestamp: duration_since_epoch.as_secs() as u32, + channels: Vec::new(), + } + } + + /// Stubs a channel inside [`OurPeerStorage`] + pub fn stub_channel(&mut self, chan: StubChannelMonitor) { + self.channels.push(chan); + } + + /// Get a reference of `channels` array from [`StubChannelMonitor::channels`] + pub fn get_channels(&self) -> &Vec { + self.channels.as_ref() + } + + pub(crate) fn update_latest_state(&mut self, cid: ChannelId, txid: Txid, their_cur_per_commitment_points: Option<(u64, PublicKey, Option)>) { + for stub_channel in &mut self.channels { + if stub_channel.channel_id == cid { + stub_channel.latest_state = Some(txid); + stub_channel.their_cur_per_commitment_points = their_cur_per_commitment_points; + return; + } + } + } + + pub(crate) fn provide_secret(&mut self, cid: ChannelId, idx:u64, secret: [u8; 32]) -> Result<(), ()> { + for stub_channel in &mut self.channels { + if stub_channel.channel_id == cid { + return stub_channel.commitment_secrets.provide_secret(idx, secret); + } + } + return Err(()); + } + + /// This is called to update the data of the latest state inside [`OurPeerStorage`] using + /// [`ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo`] + pub(crate) fn update_state_from_monitor_update(&mut self, cid: ChannelId, monitor_update: ChannelMonitorUpdate) -> Result<(),()> { + for update in monitor_update.updates.iter() { + match update { + ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo { commitment_txid, htlc_outputs, commitment_number, + their_per_commitment_point, .. } => { + let stub_channels = &self.channels; + let mut cur_per_commitment_points = None; + for stub_channel in stub_channels { + if stub_channel.channel_id == cid { + match stub_channel.their_cur_per_commitment_points { + Some(old_points) => { + if old_points.0 == commitment_number + 1 { + cur_per_commitment_points = Some((old_points.0, old_points.1, Some(*their_per_commitment_point))); + } else if old_points.0 == commitment_number + 2 { + if let Some(old_second_point) = old_points.2 { + cur_per_commitment_points = Some((old_points.0 - 1, old_second_point, Some(*their_per_commitment_point))); + } else { + cur_per_commitment_points = Some((*commitment_number, *their_per_commitment_point, None)); + } + } else { + cur_per_commitment_points = Some((*commitment_number, *their_per_commitment_point, None)); + } + }, + None => { + cur_per_commitment_points = Some((*commitment_number, *their_per_commitment_point, None)); + } + } + } + } + self.update_latest_state(cid, *commitment_txid, cur_per_commitment_points); + return Ok(()); + } + ChannelMonitorUpdateStep::CommitmentSecret { idx, secret } => { + let _ = self.provide_secret(cid, *idx, *secret); + return Ok(()); + } + _ => {} + } + } + Err(()) + } + + pub fn update_best_block(&mut self, header: &Header, height: u32) { + for channel in &mut self.channels { + channel.best_block = BestBlock::new(header.block_hash(), height); + } + } + + /// Encrypt [`OurPeerStorage`] using the `key` and return a Vec containing the result. + pub fn encrypt_our_peer_storage(&self, key: [u8; 32]) -> Vec { + let n = 0u64; + let mut peer_storage = VecWriter(Vec::new()); + self.write(&mut peer_storage).unwrap(); + let mut res = vec![0;peer_storage.0.len() + 16]; + + let plaintext = &peer_storage.0[..]; + let mut nonce = [0; 12]; + nonce[4..].copy_from_slice(&n.to_le_bytes()[..]); + + let mut chacha = ChaCha20Poly1305RFC::new(&key, &nonce, b""); + let mut tag = [0; 16]; + chacha.encrypt(plaintext, &mut res[0..plaintext.len()], &mut tag); + res[plaintext.len()..].copy_from_slice(&tag); + res + } + + /// Decrypt `OurPeerStorage` using the `key`, result is stored inside the `res`. + /// Returns an error if the the `cyphertext` is not correct. + pub fn decrypt_our_peer_storage(&self, res: &mut[u8], cyphertext: &[u8], key: [u8; 32]) -> Result<(), ()> { + let n = 0u64; + let mut nonce = [0; 12]; + nonce[4..].copy_from_slice(&n.to_le_bytes()[..]); + + let mut chacha = ChaCha20Poly1305RFC::new(&key, &nonce, b""); + if chacha.variable_time_decrypt(&cyphertext[0..cyphertext.len() - 16], res, &cyphertext[cyphertext.len() - 16..]).is_err() { + return Err(()); + } + Ok(()) + } +} + +impl_writeable_tlv_based!(OurPeerStorage, { + (0, version, (default_value, 1)), + (2, timestamp, required), + (4, channels, optional_vec), +}); From 1923863714618a091cc9996339d2d0607de7d2a5 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:21:59 +0530 Subject: [PATCH 05/11] lightning: Add a key inside NodeSigner which would be used to encrypt or decrpt the peerstorage and send PeerStorage on every RAA and upon reconnection. --- lightning/src/ln/channelmanager.rs | 49 ++++++++++++++++++++++++++++++ lightning/src/sign/mod.rs | 45 +++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c7adfe137c3..ed1fc8de914 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2332,6 +2332,9 @@ where inbound_payment_key: inbound_payment::ExpandedKey, + /// The key used to encrypt our peer storage that would be sent to our peers. + our_peerstorage_encryption_key: [u8;32], + /// LDK puts the [fake scids] that it generates into namespaces, to identify the type of an /// incoming payment. To make it harder for a third-party to identify the type of a payment, /// we encrypt the namespace identifier using these bytes. @@ -3212,6 +3215,7 @@ where secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material(); let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material); + let our_peerstorage_encryption_key = node_signer.get_peer_storage_key(); ChannelManager { default_configuration: config.clone(), chain_hash: ChainHash::using_genesis_block(params.network), @@ -3237,6 +3241,8 @@ where secp_ctx, inbound_payment_key: expanded_inbound_key, + our_peerstorage_encryption_key, + fake_scid_rand_bytes: entropy_source.get_secure_random_bytes(), probing_cookie_secret: entropy_source.get_secure_random_bytes(), @@ -3274,6 +3280,13 @@ where &self.default_configuration } + /// Returns the encrypted [`OurPeerStorage`] which can be distributed among our peers. + /// We use a key derived from our seed to encrypt this. + pub fn get_encrypted_our_peer_storage(&self) -> Vec { + let our_peer_storage = self.our_peer_storage.read().unwrap(); + our_peer_storage.encrypt_our_peer_storage(self.our_peerstorage_encryption_key) + } + fn create_and_insert_outbound_scid_alias(&self) -> u64 { let height = self.best_block.read().unwrap().height; let mut outbound_scid_alias = 0; @@ -8598,6 +8611,30 @@ where hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; + + { + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id) + }).map(|mtx| mtx.lock().unwrap())?; + let peer_state = &mut *peer_state_lock; + let our_peer_storage = self.get_encrypted_our_peer_storage(); + + for context in peer_state.channel_by_id.iter().map(|(_, phase)| phase.context()) { + // Update latest PeerStorage for the peer. + peer_state.pending_msg_events.push( + events::MessageSendEvent::SendPeerStorageMessage { + node_id: context.get_counterparty_node_id(), + msg: msgs::PeerStorageMessage { + data: our_peer_storage.clone() + }, + } + ); + } + } + self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id); Ok(()) } @@ -10899,6 +10936,7 @@ where if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let num_channels = peer_state.total_channel_count(); let pending_msg_events = &mut peer_state.pending_msg_events; if !peer_state.peer_storage.is_empty() { @@ -10910,6 +10948,15 @@ where }); } + if peer_state.latest_features.supports_provide_peer_storage() && num_channels > 0 { + let our_peer_storage = self.get_encrypted_our_peer_storage(); + pending_msg_events.push(events::MessageSendEvent::SendPeerStorageMessage { + node_id: counterparty_node_id.clone(), + msg: msgs::PeerStorageMessage { + data: our_peer_storage + }, + }); + } for (_, phase) in peer_state.channel_by_id.iter_mut() { match phase { @@ -12955,6 +13002,7 @@ where let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material(); let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material); + let our_peerstorage_encryption_key = args.node_signer.get_peer_storage_key(); let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len()); if let Some(purposes) = claimable_htlc_purposes { @@ -13180,6 +13228,7 @@ where best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)), inbound_payment_key: expanded_inbound_key, + our_peerstorage_encryption_key, pending_inbound_payments: Mutex::new(pending_inbound_payments), pending_outbound_payments: pending_outbounds, pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 8ad34f2d653..7c1802e9372 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -835,6 +835,35 @@ pub trait NodeSigner { /// [phantom node payments]: PhantomKeysManager fn get_inbound_payment_key_material(&self) -> KeyMaterial; + /// Generates a 32-byte key used for peer storage encryption. + /// + /// This function derives an encryption key for peer storage by using the HKDF + /// (HMAC-based Key Derivation Function) with a specific label and the node + /// secret key. The derived key is used for encrypting or decrypting peer storage + /// data. + /// + /// The process involves the following steps: + /// 1. Retrieves the node secret key. + /// 2. Uses the node secret key and the label `"Peer Storage Encryption Key"` + /// to perform HKDF extraction and expansion. + /// 3. Returns the first part of the derived key, which is a 32-byte array. + /// + /// # Returns + /// + /// Returns a 32-byte array that serves as the encryption key for peer storage. + /// + /// # Panics + /// + /// This function does not panic under normal circumstances, but failures in + /// obtaining the node secret key or issues within the HKDF function may cause + /// unexpected behavior. + /// + /// # Notes + /// + /// Ensure that the node secret key is securely managed, as it is crucial for + /// the security of the derived encryption key. + fn get_peer_storage_key(&self) -> [u8; 32]; + /// Get node id based on the provided [`Recipient`]. /// /// This method must return the same value each time it is called with a given [`Recipient`] @@ -2174,6 +2203,14 @@ impl NodeSigner for KeysManager { self.inbound_payment_key.clone() } + fn get_peer_storage_key(&self) -> [u8; 32] { + let (t1, _) = hkdf_extract_expand_twice( + b"Peer Storage Encryption Key", + &self.get_node_secret_key().secret_bytes(), + ); + t1 + } + fn sign_invoice( &self, invoice: &RawBolt11Invoice, recipient: Recipient, ) -> Result { @@ -2352,6 +2389,14 @@ impl NodeSigner for PhantomKeysManager { self.inbound_payment_key.clone() } + fn get_peer_storage_key(&self) -> [u8; 32] { + let (t1, _) = hkdf_extract_expand_twice( + b"Peer Storage Encryption Key", + &self.get_node_secret_key().secret_bytes(), + ); + t1 + } + fn sign_invoice( &self, invoice: &RawBolt11Invoice, recipient: Recipient, ) -> Result { From 02737f77aaa1704709ee45f79966ee81339200bd Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:38:06 +0530 Subject: [PATCH 06/11] channelmonitor: Create StubChannelMonitor so that channels can be stubbed to recover from PeerStorage. --- lightning/src/chain/channelmonitor.rs | 113 +++++++++++++++++++++++++- lightning/src/ln/channelmanager.rs | 2 +- 2 files changed, 111 insertions(+), 4 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 3ec013a076b..51771c9b8c3 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -36,20 +36,21 @@ use crate::ln::channel::INITIAL_COMMITMENT_NUMBER; use crate::ln::types::{PaymentHash, PaymentPreimage, ChannelId}; use crate::ln::msgs::DecodeError; use crate::ln::channel_keys::{DelayedPaymentKey, DelayedPaymentBasepoint, HtlcBasepoint, HtlcKey, RevocationKey, RevocationBasepoint}; -use crate::ln::chan_utils::{self,CommitmentTransaction, CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCClaim, ChannelTransactionParameters, HolderCommitmentTransaction, TxCreationKeys}; +use crate::ln::chan_utils::{self, CommitmentTransaction, CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCClaim, ChannelTransactionParameters, HolderCommitmentTransaction, TxCreationKeys}; use crate::ln::channelmanager::{HTLCSource, SentHTLCId}; -use crate::ln::our_peer_storage::StubChannelMonitor; use crate::ln::features::ChannelTypeFeatures; +use crate::ln::our_peer_storage::StubChannelMonitor; use crate::chain; use crate::chain::{BestBlock, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator}; use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::crypto::chacha20poly1305rfc::ChaCha20Poly1305RFC; use crate::sign::{ChannelDerivationParameters, HTLCDescriptor, SpendableOutputDescriptor, StaticPaymentOutputDescriptor, DelayedPaymentOutputDescriptor, ecdsa::EcdsaChannelSigner, SignerProvider, EntropySource}; use crate::chain::onchaintx::{ClaimEvent, FeerateStrategy, OnchainTxHandler}; use crate::chain::package::{CounterpartyOfferedHTLCOutput, CounterpartyReceivedHTLCOutput, HolderFundingOutput, HolderHTLCOutput, PackageSolvingData, PackageTemplate, RevokedOutput, RevokedHTLCOutput}; use crate::chain::Filter; use crate::util::logger::{Logger, Record}; -use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48}; +use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48, VecWriter}; use crate::util::byte_utils; use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent}; use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent}; @@ -115,6 +116,11 @@ pub struct ChannelMonitorUpdate { /// No other [`ChannelMonitorUpdate`]s are allowed after force-close. pub const CLOSED_CHANNEL_UPDATE_ID: u64 = core::u64::MAX; +/// This update ID is used inside [`ChannelMonitorImpl`] to recognise +/// that we're dealing with a [`StubChannelMonitor`]. Since we require some +/// exceptions while dealing with it. +pub const STUB_CHANNEL_UPDATE_IDENTIFIER: u64 = core::u64::MAX - 1; + impl Writeable for ChannelMonitorUpdate { fn write(&self, w: &mut W) -> Result<(), io::Error> { write_ver_prefix!(w, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); @@ -1434,6 +1440,100 @@ impl ChannelMonitor { }) } + /// Returns a [`ChannelMonitor`] using [`StubChannelMonitor`] and other + /// important information to sweep funds and create penalty transactions. + pub(crate) fn new_stub(secp_ctx: Secp256k1, stub_channel: &StubChannelMonitor, keys: Signer, channel_parameters: ChannelTransactionParameters ,funding_info_scriptbuf: ScriptBuf, destination_script: ScriptBuf) -> ChannelMonitor { + let mut outputs_to_watch = new_hash_map(); + outputs_to_watch.insert(stub_channel.funding_outpoint.txid, vec![(stub_channel.funding_outpoint.index as u32, funding_info_scriptbuf.clone())]); + let dummy_key = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let dummy_sig = crate::crypto::utils::sign(&secp_ctx, &secp256k1::Message::from_digest_slice(&[42; 32]).unwrap(), &SecretKey::from_slice(&[42; 32]).unwrap()); + let counterparty_payment_script = chan_utils::get_counterparty_payment_script( + &stub_channel.features, &keys.pubkeys().payment_point + ); + let holder_revocation_basepoint = keys.pubkeys().revocation_basepoint; + let holder_commitment_tx = HolderSignedTx { + txid: stub_channel.funding_outpoint.txid, + revocation_key: RevocationKey(dummy_key), + a_htlc_key: HtlcKey(dummy_key), + b_htlc_key: HtlcKey(dummy_key), + delayed_payment_key: DelayedPaymentKey(dummy_key), + per_commitment_point: dummy_key, + htlc_outputs: Vec::new(), // There are never any HTLCs in the initial commitment transactions + to_self_value_sat: 0, + feerate_per_kw: 1, + }; + + let dummy_tx_creation_keys = TxCreationKeys { + per_commitment_point: dummy_key.clone(), + revocation_key: RevocationKey::from_basepoint(&secp_ctx, &RevocationBasepoint::from(dummy_key), &dummy_key), + broadcaster_htlc_key: HtlcKey::from_basepoint(&secp_ctx, &HtlcBasepoint::from(dummy_key), &dummy_key), + countersignatory_htlc_key: HtlcKey::from_basepoint(&secp_ctx, &HtlcBasepoint::from(dummy_key), &dummy_key), + broadcaster_delayed_payment_key: DelayedPaymentKey::from_basepoint(&secp_ctx, &DelayedPaymentBasepoint::from(dummy_key), &dummy_key), + }; + let counterparty_htlc_sigs = Vec::new(); + let mut nondust_htlcs: Vec<(HTLCOutputInCommitment, Option>)> = Vec::new(); + let inner = CommitmentTransaction::new_with_auxiliary_htlc_data(0, 0, 0, dummy_key.clone(), dummy_key.clone(), dummy_tx_creation_keys, 0, &mut nondust_htlcs, &channel_parameters.as_counterparty_broadcastable()); + let holder_commitment = HolderCommitmentTransaction::new(inner, dummy_sig, counterparty_htlc_sigs, &dummy_key, &PublicKey::from_slice(&[2;33]).unwrap()); + let onchain_tx_handler = OnchainTxHandler::new( + stub_channel.channel_value_stoshis, stub_channel.channel_keys_id, destination_script.clone(), keys, + channel_parameters, holder_commitment, secp_ctx + ); + let counterparty_commitment_params = CounterpartyCommitmentParameters { counterparty_delayed_payment_base_key: stub_channel.counterparty_delayed_payment_base_key, + counterparty_htlc_base_key: stub_channel.counterparty_htlc_base_key, on_counterparty_tx_csv: stub_channel.on_counterparty_tx_csv }; + let mut counterparty_claimable_outpoints = new_hash_map(); + counterparty_claimable_outpoints.insert(stub_channel.latest_state.unwrap(), Vec::new()); + + Self::from_impl(ChannelMonitorImpl { + latest_update_id: STUB_CHANNEL_UPDATE_IDENTIFIER, + commitment_transaction_number_obscure_factor: stub_channel.obscure_factor, + destination_script: destination_script.clone(), + broadcasted_holder_revokable_script: None, + counterparty_payment_script, + shutdown_script: None, + channel_keys_id: stub_channel.channel_keys_id, + holder_revocation_basepoint, + channel_id: stub_channel.channel_id, + funding_info: (stub_channel.funding_outpoint, ScriptBuf::new()), + current_counterparty_commitment_txid: None, + prev_counterparty_commitment_txid: None, + counterparty_commitment_params, + funding_redeemscript: ScriptBuf::new(), + channel_value_satoshis: stub_channel.channel_value_stoshis, + their_cur_per_commitment_points: stub_channel.their_cur_per_commitment_points, + on_holder_tx_csv: 1, + commitment_secrets: stub_channel.commitment_secrets.clone(), + counterparty_claimable_outpoints, + + holder_pays_commitment_tx_fee: None, + + counterparty_hash_commitment_number: new_hash_map(), + counterparty_commitment_txn_on_chain: new_hash_map(), + counterparty_fulfilled_htlcs: new_hash_map(), + prev_holder_signed_commitment_tx: None, + current_holder_commitment_tx: holder_commitment_tx, + current_counterparty_commitment_number: 0, + current_holder_commitment_number: 1, + payment_preimages: new_hash_map(), + pending_monitor_events: Vec::new(), + pending_events: Vec::new(), + is_processing_pending_events: false, + onchain_events_awaiting_threshold_conf: Vec::new(), + outputs_to_watch, + onchain_tx_handler, + lockdown_from_offchain: true, + holder_tx_signed: true, + funding_spend_seen: false, + funding_spend_confirmed: None, + confirmed_commitment_tx_counterparty_output: None, + htlcs_resolved_on_chain: Vec::new(), + spendable_txids_confirmed: Vec::new(), + best_block: stub_channel.best_block, + counterparty_node_id: Some(stub_channel.counterparty_node_id), + initial_counterparty_commitment_info: None, + balances_empty_height: None + }) + } + #[cfg(test)] fn provide_secret(&self, idx: u64, secret: [u8; 32]) -> Result<(), &'static str> { self.inner.lock().unwrap().provide_secret(idx, secret) @@ -3508,6 +3608,10 @@ impl ChannelMonitorImpl { block_hash, per_commitment_claimable_data.iter().map(|(htlc, htlc_source)| (htlc, htlc_source.as_ref().map(|htlc_source| htlc_source.as_ref())) ), logger); + } else if self.latest_update_id == STUB_CHANNEL_UPDATE_IDENTIFIER { + // Since we aren't storing per commitment option inside stub channels. + fail_unbroadcast_htlcs!(self, "revoked counterparty", commitment_txid, tx, height, + block_hash, [].iter().map(|reference| *reference), logger); } else { // Our fuzzers aren't constrained by pesky things like valid signatures, so can // spend our funding output with a transaction which doesn't match our past @@ -4268,6 +4372,9 @@ impl ChannelMonitorImpl { if *idx == input.previous_output.vout { #[cfg(test)] { + if self.latest_update_id == STUB_CHANNEL_UPDATE_IDENTIFIER { + return true; + } // If the expected script is a known type, check that the witness // appears to be spending the correct type (ie that the match would // actually succeed in BIP 158/159-style filters). diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ed1fc8de914..9bc236e5407 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7900,7 +7900,7 @@ where }; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None); + let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None); // Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with). if !peer_state.channel_by_id.values().any(|phase| matches!(phase, ChannelPhase::Funded(_))) { From e3b2ac98a580dc8ddaa39ccf228f5156a2696227 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:43:15 +0530 Subject: [PATCH 07/11] chainmonitor: Add persistence logic for StubChannelMonitor and appropriate helpers to reload it. --- fuzz/src/chanmon_consistency.rs | 4 ++++ lightning/src/chain/chainmonitor.rs | 14 +++++++++++- lightning/src/chain/channelmonitor.rs | 9 ++++++++ lightning/src/chain/mod.rs | 9 ++++++++ lightning/src/ln/blinded_payment_tests.rs | 1 + lightning/src/ln/functional_test_utils.rs | 26 +++++++++++++++++++++++ lightning/src/util/test_utils.rs | 12 +++++++++++ 7 files changed, 74 insertions(+), 1 deletion(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 9616b6f54b9..94647fdd898 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -287,6 +287,10 @@ impl chain::Watch for TestChainMonitor { ) -> Vec<(OutPoint, ChannelId, Vec, Option)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec { + return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index afd9df62851..092738d1d80 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -230,6 +230,7 @@ pub struct ChainMonitor, { monitors: RwLock>>, + chain_source: Option, broadcaster: T, logger: L, @@ -266,7 +267,7 @@ where C::Target: chain::Filter, /// Calls which represent a new blockchain tip height should set `best_height`. fn process_chain_data(&self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN) where - FN: Fn(&ChannelMonitor, &TransactionData) -> Vec + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned()); @@ -749,6 +750,17 @@ where C::Target: chain::Filter, L::Target: Logger, P::Target: Persist, { + fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec { + let stub_monitors = self.stub_monitors.read().unwrap(); + let mut stubs = vec![]; + for (_, mon) in stub_monitors.iter() { + if mon.get_counterparty_node_id() == Some(counterparty_node_id) { + stubs.push(mon.channel_id()); + } + } + stubs + } + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { let logger = WithChannelMonitor::from(&self.logger, &monitor, None); let mut monitors = self.monitors.write().unwrap(); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 51771c9b8c3..c66ca5ecd07 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -3532,6 +3532,15 @@ impl ChannelMonitorImpl { self.current_holder_commitment_number } + /// Updates the [`StubChannelMonitor`] when we receive a new more recent + /// peer storage from our peer. This souldn't be called through [`ChannelMonitor`]. + fn update_latest_state_from_new_stubchannelmonitor(&mut self, stub: &StubChannelMonitor) { + let inner = stub.inner.lock().unwrap(); + self.commitment_secrets = inner.commitment_secrets.clone(); + self.counterparty_claimable_outpoints = inner.counterparty_claimable_outpoints.clone(); + self.their_cur_per_commitment_points = inner.their_cur_per_commitment_points.clone(); + } + /// Attempts to claim a counterparty commitment transaction's outputs using the revocation key and /// data in counterparty_claimable_outpoints. Will directly claim any HTLC outputs which expire at a /// height > height + CLTV_SHARED_CLAIM_BUFFER. In any case, will install monitoring for diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 36b1ce57309..0e4c5e12341 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -305,6 +305,15 @@ pub trait Watch { /// For details on asynchronous [`ChannelMonitor`] updating and returning /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)>; + + /// Retrieves a list of channel IDs for [`StubChannelMonitor`] associated with a specific counterparty node ID. + /// + /// This function searches through the collection of [`StubChannelMonitor`] and collects the channel IDs + /// of those monitors that have the specified counterparty node ID. + /// + /// This is used by [`FundRecoverer`] to fetch all the [`ChannelId`] with a peer that needs recovery so that we can send them + /// `BogusChannelReestablish`. + fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec; } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/ln/blinded_payment_tests.rs b/lightning/src/ln/blinded_payment_tests.rs index d099e439ae5..776eadab779 100644 --- a/lightning/src/ln/blinded_payment_tests.rs +++ b/lightning/src/ln/blinded_payment_tests.rs @@ -1466,6 +1466,7 @@ fn route_blinding_spec_test_vector() { fn sign_invoice( &self, _invoice: &RawBolt11Invoice, _recipient: Recipient, ) -> Result { unreachable!() } + fn get_peer_storage_key(&self) -> [u8;32] { unreachable!() } fn sign_bolt12_invoice_request( &self, _invoice_request: &UnsignedInvoiceRequest, ) -> Result { unreachable!() } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 6ae52d736d6..f08697cae64 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -716,6 +716,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { panic!(); } } + assert_eq!(*chain_source.watched_txn.unsafe_well_ordered_double_lock_self(), *self.chain_source.watched_txn.unsafe_well_ordered_double_lock_self()); assert_eq!(*chain_source.watched_outputs.unsafe_well_ordered_double_lock_self(), *self.chain_source.watched_outputs.unsafe_well_ordered_double_lock_self()); } @@ -2034,6 +2035,16 @@ pub fn do_main_commitment_signed_dance(node_a: &Node<'_, '_, '_>, node_b: &Node< check_added_monitors!(node_b, 0); assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); node_b.node.handle_revoke_and_ack(node_a.node.get_our_node_id(), &as_revoke_and_ack); + let events = node_b.node.get_and_clear_pending_msg_events(); + assert!(events.len() == 1); + + match events.get(0).unwrap() { + MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } => { + assert_eq!(*node_id, node_a.node.get_our_node_id()); + node_a.node.handle_peer_storage(node_b.node.get_our_node_id(), msg); + }, + _ =>panic!("Unexpected event"), + } assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(node_b, 1); node_b.node.handle_commitment_signed(node_a.node.get_our_node_id(), &as_commitment_signed); @@ -2095,6 +2106,15 @@ pub fn do_commitment_signed_dance(node_a: &Node<'_, '_, '_>, node_b: &Node<'_, ' // Expecting the failure backwards event to the previous hop (not `node_b`) assert_eq!(number_of_msg_events, 1); } else { + let events = node_a.node.get_and_clear_pending_msg_events(); + assert!(events.len() == 1); + match events.get(0).unwrap() { + MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } => { + assert_eq!(*node_id, node_b.node.get_our_node_id()); + node_b.node.handle_peer_storage(node_b.node.get_our_node_id(), msg); + }, + _ =>panic!("Unexpected event"), + } assert!(node_a.node.get_and_clear_pending_msg_events().is_empty()); } } @@ -3574,6 +3594,12 @@ macro_rules! get_chan_reestablish_msgs { } else if let MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, .. } = msg { assert_eq!(*node_id, $dst_node.node.get_our_node_id()); announcements.insert(msg.contents.short_channel_id); + } else if let MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } = msg { + $dst_node.node.handle_peer_storage($src_node.node.get_our_node_id(), msg); + assert_eq!(*node_id, $dst_node.node.get_our_node_id()); + } else if let MessageSendEvent::SendYourPeerStorageMessage { ref node_id, ref msg } = msg { + $dst_node.node.handle_your_peer_storage($src_node.node.get_our_node_id(), msg); + assert_eq!(*node_id, $dst_node.node.get_our_node_id()); } else { panic!("Unexpected event") } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 9f544d94520..fb0d3511a24 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -423,6 +423,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec { + return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id); + } } #[cfg(test)] @@ -1195,6 +1199,10 @@ impl NodeSigner for TestNodeSigner { unreachable!() } + fn get_peer_storage_key(&self) -> [u8;32] { + unreachable!() + } + fn get_node_id(&self, recipient: Recipient) -> Result { let node_secret = match recipient { Recipient::Node => Ok(&self.node_secret), @@ -1271,6 +1279,10 @@ impl NodeSigner for TestKeysInterface { self.backing.sign_invoice(invoice, recipient) } + fn get_peer_storage_key(&self) -> [u8;32] { + self.backing.get_peer_storage_key() + } + fn sign_bolt12_invoice_request( &self, invoice_request: &UnsignedInvoiceRequest ) -> Result { From 44483f8f40c6c11e2f2640a126348088ac2176f2 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:45:03 +0530 Subject: [PATCH 08/11] lightning: Handle your_peer_storage from our peers. --- lightning/src/ln/channelmanager.rs | 60 +++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9bc236e5407..b8ed7a093d8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -57,6 +57,8 @@ use crate::ln::features::Bolt11InvoiceFeatures; use crate::routing::router::{BlindedTail, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router}; use crate::ln::onion_payment::{check_incoming_htlc_cltv, create_recv_pending_htlc_info, create_fwd_pending_htlc_info, decode_incoming_update_add_htlc_onion, InboundHTLCErr, NextPacketDetails}; use crate::ln::msgs; +use crate::ln::channel_keys::RevocationBasepoint; +use crate::ln::chan_utils::{make_funding_redeemscript, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, ChannelPublicKeys}; use crate::ln::onion_utils; use crate::ln::onion_utils::{HTLCFailReason, INVALID_ONION_BLINDING}; use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; @@ -77,8 +79,8 @@ use crate::offers::static_invoice::StaticInvoice; use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler}; use crate::onion_message::messenger::{Destination, MessageRouter, Responder, ResponseInstruction, MessageSendInstructions}; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; -use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider}; use crate::sign::ecdsa::EcdsaChannelSigner; +use crate::sign::{EntropySource, ChannelSigner, NodeSigner, Recipient, SignerProvider}; use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; @@ -7928,6 +7930,60 @@ where peer_state.peer_storage = msg.data.clone(); } + fn internal_your_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::YourPeerStorageMessage) { + let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None); + if msg.data.len() < 16 { + log_debug!(logger, "Invalid YourPeerStorage received from {}", log_pubkey!(counterparty_node_id)); + return; + } + + let mut res = vec![0; msg.data.len() - 16]; + { + let our_peer_storage = self.our_peer_storage.read().unwrap(); + match our_peer_storage.decrypt_our_peer_storage(&mut res, msg.data.as_slice(), self.our_peerstorage_encryption_key) { + Ok(()) => { + // Decryption successful, the plaintext is now stored in `res` + log_debug!(logger, "Received a peer storage from peer {}", log_pubkey!(counterparty_node_id)); + } + Err(_) => { + log_debug!(logger, "Invalid YourPeerStorage received from {}", log_pubkey!(counterparty_node_id)); + return; + } + } + } + + let our_peer_storage = ::read(&mut ::bitcoin::io::Cursor::new(res)).unwrap(); + let per_peer_state = self.per_peer_state.read().unwrap(); + + for ps_channel in our_peer_storage.get_channels() { + let peer_state_mutex = match per_peer_state.get(&ps_channel.counterparty_node_id) { + Some(mutex) => mutex, + None => { + log_debug!(logger, "Not able to find peer_state for the counterparty {}, channelId {}", log_pubkey!(ps_channel.counterparty_node_id), ps_channel.channel_id); + panic!("Found a missing channel {} through peer storage", ps_channel.channel_id); + } + }; + + let peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &*peer_state_lock; + + match peer_state.channel_by_id.get(&ps_channel.channel_id) { + Some(ChannelPhase::Funded(chan)) => { + if chan.context.get_commitment_secret().get_min_seen_secret() > ps_channel.get_min_seen_secret() { + panic!("Lost channel state for channel {}. + Received peer storage with a more recent state than what our node had. + Use the FundRecoverer to initiate a force close and sweep the funds.", ps_channel.channel_id); + } + }, + Some(_) => {} + None => { + panic!("Found a missing channel {} through peer storage", ps_channel.channel_id); + } + } + } + + } + fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let best_block = *self.best_block.read().unwrap(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -10586,6 +10642,8 @@ where } fn handle_your_peer_storage(&self, counterparty_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage) { + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || NotifyOption::SkipPersistNoEvents); + self.internal_your_peer_storage(&counterparty_node_id, msg); } fn handle_channel_ready(&self, counterparty_node_id: PublicKey, msg: &msgs::ChannelReady) { From 1ee4c6e0825deb963546b26751bb02366d602eca Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sat, 7 Sep 2024 00:22:40 +0530 Subject: [PATCH 09/11] channelmanager: Create FundRecoverer to take our node in offline mode so that we can just send a BogusChannelReestablish and close all the StubChannelMonitors and sweep the funds from the events. --- lightning/src/chain/chainmonitor.rs | 173 +++--- lightning/src/chain/channelmonitor.rs | 26 +- lightning/src/chain/mod.rs | 9 - lightning/src/events/mod.rs | 29 +- lightning/src/ln/fundrecoverer.rs | 813 ++++++++++++++++++++++++++ lightning/src/ln/mod.rs | 2 +- lightning/src/util/test_utils.rs | 4 - 7 files changed, 951 insertions(+), 105 deletions(-) create mode 100644 lightning/src/ln/fundrecoverer.rs diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 092738d1d80..fc6ec91bf3f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -165,8 +165,8 @@ pub trait Persist { fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint); } -struct MonitorHolder { - monitor: ChannelMonitor, +pub(crate) struct MonitorHolder { + pub(crate) monitor: ChannelMonitor, /// The full set of pending monitor updates for this Channel. /// /// Note that this lock must be held from [`ChannelMonitor::update_monitor`] through to @@ -181,7 +181,7 @@ struct MonitorHolder { /// could cause users to have a full [`ChannelMonitor`] on disk as well as a /// [`ChannelMonitorUpdate`] which was already applied. While this isn't an issue for the /// LDK-provided update-based [`Persist`], it is somewhat surprising for users so we avoid it. - pending_monitor_updates: Mutex>, + pub(crate) pending_monitor_updates: Mutex>, } impl MonitorHolder { @@ -195,8 +195,8 @@ impl MonitorHolder { /// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is /// released. pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> { - lock: RwLockReadGuard<'a, HashMap>>, - funding_txo: OutPoint, + pub(crate) lock: RwLockReadGuard<'a, HashMap>>, + pub(crate) funding_txo: OutPoint, } impl Deref for LockedChannelMonitor<'_, ChannelSigner> { @@ -247,73 +247,19 @@ pub struct ChainMonitor ChainMonitor -where C::Target: chain::Filter, - T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - P::Target: Persist, -{ - /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view - /// of a channel and reacting accordingly based on transactions in the given chain data. See - /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will - /// be returned by [`chain::Watch::release_pending_monitor_events`]. - /// - /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent - /// calls must not exclude any transactions matching the new outputs nor any in-block - /// descendants of such transactions. It is not necessary to re-fetch the block to obtain - /// updated `txdata`. - /// - /// Calls which represent a new blockchain tip height should set `best_height`. - fn process_chain_data(&self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN) +pub(crate) fn update_monitor_with_chain_data_util ( + persister: &P, chain_source: &Option, logger: &L, header: &Header, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, + monitor_state: &MonitorHolder, channel_count: usize, + ) -> Result<(), ()> where - FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, + C::Target: chain::Filter, + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, + P::Target: Persist, + L::Target: Logger, + ChannelSigner: EcdsaChannelSigner, { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned()); - let channel_count = funding_outpoints.len(); - for funding_outpoint in funding_outpoints.iter() { - let monitor_lock = self.monitors.read().unwrap(); - if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { - if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(monitor_lock); - let _poison = self.monitors.write().unwrap(); - log_error!(self.logger, "{}", err_str); - panic!("{}", err_str); - } - } - } - - // do some followup cleanup if any funding outpoints were added in between iterations - let monitor_states = self.monitors.write().unwrap(); - for (funding_outpoint, monitor_state) in monitor_states.iter() { - if !funding_outpoints.contains(funding_outpoint) { - if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() { - log_error!(self.logger, "{}", err_str); - panic!("{}", err_str); - } - } - } - - if let Some(height) = best_height { - // If the best block height is being updated, update highest_chain_height under the - // monitors write lock. - let old_height = self.highest_chain_height.load(Ordering::Acquire); - let new_height = height as usize; - if new_height > old_height { - self.highest_chain_height.store(new_height, Ordering::Release); - } - } - } - - fn update_monitor_with_chain_data( - &self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, - monitor_state: &MonitorHolder, channel_count: usize, - ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let logger = WithChannelMonitor::from(logger, &monitor, None); let mut txn_outputs = process(monitor, txdata); @@ -338,7 +284,7 @@ where C::Target: chain::Filter, // `ChannelMonitorUpdate` after a channel persist for a channel with the same // `latest_update_id`. let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) { + match persister.update_persisted_channel(*funding_outpoint, None, monitor) { ChannelMonitorUpdateStatus::Completed => log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data", log_funding_info!(monitor) @@ -354,7 +300,7 @@ where C::Target: chain::Filter, // Register any new outputs with the chain source for filtering, storing any dependent // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { + if let Some(ref chain_source_ref) = chain_source { let block_hash = header.block_hash(); for (txid, mut outputs) in txn_outputs.drain(..) { for (idx, output) in outputs.drain(..) { @@ -365,13 +311,85 @@ where C::Target: chain::Filter, script_pubkey: output.script_pubkey, }; log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint); - chain_source.register_output(output); + chain_source_ref.register_output(output); } } } Ok(()) } +pub(crate) fn process_chain_data_util(persister: &P, chain_source: &Option, + logger: &L, monitors: &RwLock>>, highest_chain_height: &AtomicUsize, + header: &Header, best_height: Option, txdata: &TransactionData, process: FN) +where + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, + L::Target: Logger, + P::Target: Persist, + C::Target: chain::Filter, +{ + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + let funding_outpoints = hash_set_from_iter(monitors.read().unwrap().keys().cloned()); + let channel_count = funding_outpoints.len(); + for funding_outpoint in funding_outpoints.iter() { + let monitor_lock = monitors.read().unwrap(); + if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { + if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitor_lock); + let _poison = monitors.write().unwrap(); + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + } + } + } + + // do some followup cleanup if any funding outpoints were added in between iterations + let monitor_states = monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + if !funding_outpoints.contains(funding_outpoint) { + if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() { + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + } + } + } + + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + highest_chain_height.store(new_height, Ordering::Release); + } + } +} +impl ChainMonitor +where C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: Persist, +{ + /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view + /// of a channel and reacting accordingly based on transactions in the given chain data. See + /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will + /// be returned by [`chain::Watch::release_pending_monitor_events`]. + /// + /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent + /// calls must not exclude any transactions matching the new outputs nor any in-block + /// descendants of such transactions. It is not necessary to re-fetch the block to obtain + /// updated `txdata`. + /// + /// Calls which represent a new blockchain tip height should set `best_height`. + fn process_chain_data(&self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN) + where + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, + { + process_chain_data_util(&self.persister, &self.chain_source, &self.logger, &self.monitors, &self.highest_chain_height, header, best_height, txdata, process); + } + /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. /// /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor @@ -750,17 +768,6 @@ where C::Target: chain::Filter, L::Target: Logger, P::Target: Persist, { - fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec { - let stub_monitors = self.stub_monitors.read().unwrap(); - let mut stubs = vec![]; - for (_, mon) in stub_monitors.iter() { - if mon.get_counterparty_node_id() == Some(counterparty_node_id) { - stubs.push(mon.channel_id()); - } - } - stubs - } - fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { let logger = WithChannelMonitor::from(&self.logger, &monitor, None); let mut monitors = self.monitors.write().unwrap(); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index c66ca5ecd07..572281fb010 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1440,6 +1440,13 @@ impl ChannelMonitor { }) } + pub(crate) fn merge_commitment_secret(&mut self, monitor: ChannelMonitor) { + if self.get_min_seen_secret() > monitor.get_min_seen_secret() { + let inner = monitor.inner.lock().unwrap(); + self.inner.lock().unwrap().commitment_secrets = inner.commitment_secrets.clone(); + } + } + /// Returns a [`ChannelMonitor`] using [`StubChannelMonitor`] and other /// important information to sweep funds and create penalty transactions. pub(crate) fn new_stub(secp_ctx: Secp256k1, stub_channel: &StubChannelMonitor, keys: Signer, channel_parameters: ChannelTransactionParameters ,funding_info_scriptbuf: ScriptBuf, destination_script: ScriptBuf) -> ChannelMonitor { @@ -1689,6 +1696,10 @@ impl ChannelMonitor { } } + pub fn update_latest_state_from_new_stubmonitor(&self, stub: &StubChannelMonitor) { + self.inner.lock().unwrap().update_latest_state_from_new_stubmonitor(stub); + } + /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. pub fn get_and_clear_pending_monitor_events(&self) -> Vec { @@ -3532,13 +3543,14 @@ impl ChannelMonitorImpl { self.current_holder_commitment_number } - /// Updates the [`StubChannelMonitor`] when we receive a new more recent - /// peer storage from our peer. This souldn't be called through [`ChannelMonitor`]. - fn update_latest_state_from_new_stubchannelmonitor(&mut self, stub: &StubChannelMonitor) { - let inner = stub.inner.lock().unwrap(); - self.commitment_secrets = inner.commitment_secrets.clone(); - self.counterparty_claimable_outpoints = inner.counterparty_claimable_outpoints.clone(); - self.their_cur_per_commitment_points = inner.their_cur_per_commitment_points.clone(); + /// Updates the [`ChannelMonitor`] when we receive a new more recent + /// peer storage from our peer. + fn update_latest_state_from_new_stubmonitor(&mut self, stub: &StubChannelMonitor) { + let mut latest_state = new_hash_map(); + latest_state.insert(stub.latest_state.unwrap(), Vec::new()); + self.commitment_secrets = stub.commitment_secrets.clone(); + self.counterparty_claimable_outpoints = latest_state; + self.their_cur_per_commitment_points = stub.their_cur_per_commitment_points.clone(); } /// Attempts to claim a counterparty commitment transaction's outputs using the revocation key and diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 0e4c5e12341..36b1ce57309 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -305,15 +305,6 @@ pub trait Watch { /// For details on asynchronous [`ChannelMonitor`] updating and returning /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)>; - - /// Retrieves a list of channel IDs for [`StubChannelMonitor`] associated with a specific counterparty node ID. - /// - /// This function searches through the collection of [`StubChannelMonitor`] and collects the channel IDs - /// of those monitors that have the specified counterparty node ID. - /// - /// This is used by [`FundRecoverer`] to fetch all the [`ChannelId`] with a peer that needs recovery so that we can send them - /// `BogusChannelReestablish`. - fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec; } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index 24748a096b3..91e65d3fb0d 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -20,7 +20,7 @@ pub use bump_transaction::BumpTransactionEvent; use crate::blinded_path::message::OffersContext; use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentContext, PaymentContextRef}; -use crate::chain::transaction; +use crate::chain::{transaction, BestBlock}; use crate::ln::channelmanager::{InterceptId, PaymentId, RecipientOnionFields}; use crate::ln::channel::FUNDING_CONF_DEADLINE_BLOCKS; use crate::ln::features::ChannelTypeFeatures; @@ -2298,6 +2298,13 @@ impl MaybeReadable for Event { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RecoveryEvent { + RescanBlock { + rescan_from: BestBlock + }, +} + /// An event generated by ChannelManager which indicates a message should be sent to a peer (or /// broadcast to most peers). /// These events are handled by PeerManager::process_events if you are using a PeerManager. @@ -2647,6 +2654,10 @@ pub trait EventsProvider { fn process_pending_events(&self, handler: H) where H::Target: EventHandler; } +pub trait RecoverEventsProvider: EventsProvider { + fn process_pending_recovery_events(&self, handler:RH) where RH::Target: RecoveryHandler; +} + /// An error type that may be returned to LDK in order to safely abort event handling if it can't /// currently succeed (e.g., due to a persistence failure). /// @@ -2678,3 +2689,19 @@ impl EventHandler for Arc { self.deref().handle_event(event) } } + +pub trait RecoveryHandler { + fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent>; +} + +impl RecoveryHandler for F where F: Fn(RecoveryEvent) -> Result<(), ReplayEvent> { + fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent> { + self(event) + } +} + +impl RecoveryHandler for Arc { + fn handle_recovery_event(&self, event: RecoveryEvent) -> Result<(), ReplayEvent> { + self.deref().handle_recovery_event(event) + } +} \ No newline at end of file diff --git a/lightning/src/ln/fundrecoverer.rs b/lightning/src/ln/fundrecoverer.rs new file mode 100644 index 00000000000..e82db539984 --- /dev/null +++ b/lightning/src/ln/fundrecoverer.rs @@ -0,0 +1,813 @@ +use bitcoin::constants::ChainHash; + +use crate::chain; +use crate::events::{MessageSendEvent, MessageSendEventsProvider, RecoveryHandler}; +use crate::ln::channelmanager::{ + provided_init_features, provided_node_features, ChainParameters, PeerState, +}; +use crate::ln::features::{ChannelTypeFeatures, InitFeatures, NodeFeatures}; +use crate::ln::msgs; +use crate::ln::msgs::ChannelMessageHandler; + +use crate::ln::types::ChannelId; +use bitcoin::block::Header; +use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hashes::Hash; +use bitcoin::secp256k1; +use bitcoin::secp256k1::Secp256k1; +use bitcoin::secp256k1::{PublicKey, SecretKey}; + +use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use crate::chain::chainmonitor::{ + process_chain_data_util, LockedChannelMonitor, MonitorHolder, Persist, +}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, TransactionOutputs, + WithChannelMonitor, STUB_CHANNEL_UPDATE_IDENTIFIER, +}; +use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Confirm, Filter, Watch, WatchedOutput}; +use crate::crypto::chacha20poly1305rfc::ChaCha20Poly1305RFC; + +use crate::events::{self, Event, EventHandler, RecoveryEvent, ReplayEvent}; +use crate::ln::chan_utils::{ + make_funding_redeemscript, ChannelPublicKeys, ChannelTransactionParameters, + CounterpartyChannelTransactionParameters, +}; +use crate::ln::channel_keys::RevocationBasepoint; +use crate::ln::our_peer_storage::{OurPeerStorage, StubChannelMonitor}; +use crate::sign::ecdsa::EcdsaChannelSigner; +use crate::sign::{EntropySource, NodeSigner, SignerProvider}; +use crate::sync::RwLock; +use crate::util::config::UserConfig; +use crate::util::logger::{Logger, WithContext}; +use crate::util::ser::Readable; +use crate::util::wakers::Notifier; +use core::sync::atomic::{AtomicUsize, Ordering}; + +use crate::prelude::*; +use crate::sync::{FairRwLock, Mutex}; +use core::cell::RefCell; +use core::ops::Deref; + +// Re-export this for use in the public API. +pub use crate::ln::outbound_payment::{ + Bolt12PaymentError, PaymentSendFailure, ProbeSendFailure, RecipientOnionFields, Retry, + RetryableSendFailure, +}; + +/// This works as a mock [`ChannelMessageHandler`] it is used mainly when a user wants to run their node in +/// offline mode i.e. This node won't communicate with any peer except sending a BogusChannelReestablish +/// for all the [`StubChannelMonitors`] being tracked by the [`ChainMonitor`]. +/// +/// [`FundRecoverer`] is parameterized by a number of components to achieve this. +/// - [`chain::Watch`] (typically [`ChainMonitor`]) for on-chain monitoring and enforcement of each +/// channel +/// - [`SignerProvider`] for providing signers whose operations are scoped to individual channels +/// - [`Logger`] for logging operational information of varying degrees +/// +/// Additionally, it implements the following traits: +/// - [`ChannelMessageHandler`] to handle off-chain channel activity from peers +/// - [`MessageSendEventsProvider`] to similarly send such messages to peers +/// +pub struct FundRecoverer< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, +> where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + default_configuration: UserConfig, + secp_ctx: Secp256k1, + entropy_source: ES, + chain_source: Option, + persister: P, + broadcaster: T, + fee_estimator: F, + + monitors: RwLock>>, + + highest_chain_height: AtomicUsize, + signer_provider: SP, + node_signer: NS, + chain_hash: ChainHash, + /// The key used to encrypt our peer storage that would be sent to our peers. + our_peerstorage_encryption_key: [u8; 32], + per_peer_state: FairRwLock>>>, + + #[cfg(test)] + pub(super) best_block: RwLock, + #[cfg(not(test))] + best_block: RwLock, + + pending_events: Mutex>, + /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for + /// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process). + event_notifier: Notifier, + + logger: L, +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > events::EventsProvider for FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] + /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain + /// within each channel. As the confirmation of a commitment transaction may be critical to the + /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an + /// environment with spotty connections, like on mobile. + /// + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: events::Event::SpendableOutputs + /// [`BumpTransaction`]: events::Event::BumpTransaction + fn process_pending_events(&self, handler: H) + where + H::Target: EventHandler, + { + for monitor_state in self.monitors.read().unwrap().values() { + match monitor_state.monitor.process_pending_events(&handler) { + Ok(()) => {}, + Err(ReplayEvent()) => { + self.event_notifier.notify(); + }, + } + } + } +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > events::RecoverEventsProvider for FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + fn process_pending_recovery_events(&self, handler: RH) + where + RH::Target: events::RecoveryHandler, + { + let mut events = self.pending_events.lock().unwrap(); + for event in events.drain(..) { + match handler.handle_recovery_event(event) { + Ok(()) => {}, + Err(ReplayEvent()) => { + self.event_notifier.notify(); + }, + } + } + } +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > MessageSendEventsProvider for FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut pending_events = Vec::new(); + let events = RefCell::new(Vec::new()); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if peer_state.pending_msg_events.len() > 0 { + pending_events.append(&mut peer_state.pending_msg_events); + } + } + if !pending_events.is_empty() { + events.replace(pending_events); + } + events.into_inner() + } +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + /// Creates a new instance of `FundRecoverer`. + /// This function initializes a `FundRecoverer` with the provided `chain_monitor`, + /// `logger`, configuration, and chain parameters. The `FundRecoverer` is set up with + /// the default configuration and a chain hash derived from the genesis block of the + /// specified network. + pub fn new( + node_signer: NS, logger: L, config: UserConfig, params: ChainParameters, + signer_provider: SP, entropy_source: ES, chain_source: Option, persister: P, + fee_estimator: F, broadcaster: T, monitors: Vec>, + ) -> Self { + let our_peerstorage_encryption_key = node_signer.get_peer_storage_key(); + let mut secp_ctx = Secp256k1::new(); + let mut monitor_map = new_hash_map(); + for monitor in monitors { + let entry = match monitor_map.entry(monitor.get_funding_txo().0) { + hash_map::Entry::Occupied(_) => { + continue; + }, + hash_map::Entry::Vacant(e) => e, + }; + + if let Some(ref chain_source) = chain_source { + monitor.load_outputs_to_watch(chain_source, &logger); + } + + entry + .insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) }); + } + secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); + return Self { + default_configuration: config.clone(), + monitors: RwLock::new(monitor_map), + persister, + fee_estimator, + broadcaster, + chain_source, + signer_provider, + entropy_source, + secp_ctx, + highest_chain_height: AtomicUsize::new(0), + best_block: RwLock::new(params.best_block), + node_signer, + our_peerstorage_encryption_key, + pending_events: Mutex::new(Vec::new()), + event_notifier: Notifier::new(), + chain_hash: ChainHash::using_genesis_block(params.network), + per_peer_state: FairRwLock::new(new_hash_map()), + logger, + }; + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + use crate::events::EventsProvider; + let events = core::cell::RefCell::new(Vec::new()); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); + self.process_pending_events(&event_handler); + events.into_inner() + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn get_and_clear_recovery_pending_events(&self) -> Vec { + use crate::events::RecoverEventsProvider; + let events = core::cell::RefCell::new(Vec::new()); + let event_handler = |event: events::RecoveryEvent| Ok(events.borrow_mut().push(event)); + self.process_pending_recovery_events(&event_handler); + events.into_inner() + } + + /// Decrypt `OurPeerStorage` using the `key`, result is stored inside the `res`. + /// Returns an error if the the `cyphertext` is not correct. + fn decrypt_our_peer_storage(&self, res: &mut [u8], cyphertext: &[u8]) -> Result<(), ()> { + let key = self.our_peerstorage_encryption_key; + let n = 0u64; + + let mut nonce = [0; 12]; + nonce[4..].copy_from_slice(&n.to_le_bytes()[..]); + + let mut chacha = ChaCha20Poly1305RFC::new(&key, &nonce, b""); + if chacha + .variable_time_decrypt( + &cyphertext[0..cyphertext.len() - 16], + res, + &cyphertext[cyphertext.len() - 16..], + ) + .is_err() + { + return Err(()); + } + Ok(()) + } + + fn stale_or_missing_channel_monitor(&self, stub_chan: &StubChannelMonitor) -> bool { + let monitor_state = self.monitors.read().unwrap(); + let monitor_holder = monitor_state.get(&stub_chan.funding_outpoint); + + // If monitor doesn't exists. + if !monitor_holder.is_some() { + return true; + } + let monitor = &monitor_holder.unwrap().monitor; + + // If we get an updated peer storage for an existing channel. + if monitor.get_latest_update_id() == STUB_CHANNEL_UPDATE_IDENTIFIER + && monitor.get_min_seen_secret() > stub_chan.get_min_seen_secret() + { + monitor.update_latest_state_from_new_stubmonitor(stub_chan); + return false; + } else { + // if the existing monitor is stale. + monitor.get_min_seen_secret() > stub_chan.get_min_seen_secret() + } + } + + fn watch_dummy(&self, stub_channel_monitor: ChannelMonitor) { + if let Some(ref chain_source) = self.chain_source { + stub_channel_monitor.load_outputs_to_watch(chain_source, &self.logger); + } + + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(stub_channel_monitor.get_funding_txo().0) { + hash_map::Entry::Occupied(mut m) => { + log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); + // If this one isn't stale we need to update the monitor. + let holder = m.get_mut(); + if holder.monitor.get_latest_update_id() != STUB_CHANNEL_UPDATE_IDENTIFIER { + if holder.monitor.get_min_seen_secret() + > stub_channel_monitor.get_min_seen_secret() + { + holder.monitor.merge_commitment_secret(stub_channel_monitor); + } + } + return; + }, + hash_map::Entry::Vacant(e) => e, + }; + + self.pending_events.lock().unwrap().push(RecoveryEvent::RescanBlock { + rescan_from: stub_channel_monitor.current_best_block(), + }); + + let persist_res = self + .persister + .persist_new_channel(stub_channel_monitor.get_funding_txo().0, &stub_channel_monitor); + + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!( + self.logger, + "Persistence of new ChannelMonitor for channel {} in progress", + log_funding_info!(stub_channel_monitor) + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!( + self.logger, + "Persistence of new ChannelMonitor for channel {} completed", + log_funding_info!(stub_channel_monitor) + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + }, + } + entry.insert(MonitorHolder { + monitor: stub_channel_monitor, + pending_monitor_updates: Mutex::new(Vec::new()), + }); + } + + fn process_chain_data( + &self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN, + ) where + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec, + { + process_chain_data_util( + &self.persister, + &self.chain_source, + &self.logger, + &self.monitors, + &self.highest_chain_height, + header, + best_height, + txdata, + process, + ); + } + + /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored. + /// + /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always + /// monitoring for on-chain state resolutions. + pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> { + self.monitors + .read() + .unwrap() + .iter() + .map(|(outpoint, monitor_holder)| { + let channel_id = monitor_holder.monitor.channel_id(); + (*outpoint, channel_id) + }) + .collect() + } + + /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no + /// such [`ChannelMonitor`] is currently being monitored for. + /// + /// Note that the result holds a mutex over our monitor set, and should not be held + /// indefinitely. + pub fn get_monitor( + &self, funding_txo: OutPoint, + ) -> Result, ()> { + let lock = self.monitors.read().unwrap(); + if lock.get(&funding_txo).is_some() { + Ok(LockedChannelMonitor { lock, funding_txo }) + } else { + Err(()) + } + } +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > ChannelMessageHandler for FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + fn handle_open_channel(&self, _their_node_id: PublicKey, _msg: &msgs::OpenChannel) {} + fn handle_accept_channel(&self, _their_node_id: PublicKey, _msg: &msgs::AcceptChannel) {} + fn handle_funding_created(&self, _their_node_id: PublicKey, _msg: &msgs::FundingCreated) {} + fn handle_funding_signed(&self, _their_node_id: PublicKey, _msg: &msgs::FundingSigned) {} + fn handle_channel_ready(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelReady) {} + fn handle_shutdown(&self, _their_node_id: PublicKey, _msg: &msgs::Shutdown) {} + fn handle_closing_signed(&self, _their_node_id: PublicKey, _msg: &msgs::ClosingSigned) {} + fn handle_update_add_htlc(&self, _their_node_id: PublicKey, _msg: &msgs::UpdateAddHTLC) {} + fn handle_update_fulfill_htlc( + &self, _their_node_id: PublicKey, _msg: &msgs::UpdateFulfillHTLC, + ) { + } + fn handle_update_fail_htlc(&self, _their_node_id: PublicKey, _msg: &msgs::UpdateFailHTLC) {} + fn handle_update_fail_malformed_htlc( + &self, _their_node_id: PublicKey, _msg: &msgs::UpdateFailMalformedHTLC, + ) { + } + fn handle_commitment_signed(&self, _their_node_id: PublicKey, _msg: &msgs::CommitmentSigned) {} + fn handle_revoke_and_ack(&self, _their_node_id: PublicKey, _msg: &msgs::RevokeAndACK) {} + fn handle_update_fee(&self, _their_node_id: PublicKey, _msg: &msgs::UpdateFee) {} + fn handle_announcement_signatures( + &self, _their_node_id: PublicKey, _msg: &msgs::AnnouncementSignatures, + ) { + } + fn handle_channel_update(&self, _their_node_id: PublicKey, _msg: &msgs::ChannelUpdate) {} + fn handle_open_channel_v2(&self, _their_node_id: PublicKey, _msg: &msgs::OpenChannelV2) {} + fn handle_accept_channel_v2(&self, _their_node_id: PublicKey, _msg: &msgs::AcceptChannelV2) {} + fn handle_stfu(&self, _their_node_id: PublicKey, _msg: &msgs::Stfu) {} + #[cfg(splicing)] + fn handle_splice_init(&self, _their_node_id: PublicKey, _msg: &msgs::SpliceInit) {} + #[cfg(splicing)] + fn handle_splice_ack(&self, _their_node_id: PublicKey, _msg: &msgs::SpliceAck) {} + #[cfg(splicing)] + fn handle_splice_locked(&self, _their_node_id: PublicKey, _msg: &msgs::SpliceLocked) {} + fn handle_tx_add_input(&self, _their_node_id: PublicKey, _msg: &msgs::TxAddInput) {} + fn handle_tx_add_output(&self, _their_node_id: PublicKey, _msg: &msgs::TxAddOutput) {} + fn handle_tx_remove_input(&self, _their_node_id: PublicKey, _msg: &msgs::TxRemoveInput) {} + fn handle_tx_remove_output(&self, _their_node_id: PublicKey, _msg: &msgs::TxRemoveOutput) {} + fn handle_tx_complete(&self, _their_node_id: PublicKey, _msg: &msgs::TxComplete) {} + fn handle_tx_signatures(&self, _their_node_id: PublicKey, _msg: &msgs::TxSignatures) {} + fn handle_tx_init_rbf(&self, _their_node_id: PublicKey, _msg: &msgs::TxInitRbf) {} + fn handle_tx_ack_rbf(&self, _their_node_id: PublicKey, _msg: &msgs::TxAckRbf) {} + fn handle_tx_abort(&self, _their_node_id: PublicKey, _msg: &msgs::TxAbort) {} + fn handle_peer_storage(&self, _their_node_id: PublicKey, _msg: &msgs::PeerStorageMessage) {} + + fn handle_your_peer_storage( + &self, counterparty_node_id: PublicKey, msg: &msgs::YourPeerStorageMessage, + ) { + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None); + if msg.data.len() < 16 { + log_debug!( + logger, + "Invalid YourPeerStorage received from {}", + log_pubkey!(counterparty_node_id) + ); + return; + } + + let mut res = vec![0; msg.data.len() - 16]; + { + match self.decrypt_our_peer_storage(&mut res, msg.data.as_slice()) { + Ok(()) => { + // Decryption successful, the plaintext is now stored in `res` + log_debug!( + logger, + "Received a peer storage from peer {}", + log_pubkey!(counterparty_node_id) + ); + }, + Err(_) => { + log_debug!( + logger, + "Invalid YourPeerStorage received from {}", + log_pubkey!(counterparty_node_id) + ); + return; + }, + } + } + + let our_peer_storage = + ::read(&mut ::bitcoin::io::Cursor::new(res)).unwrap(); + + for ps_channel in our_peer_storage.get_channels() { + if self.stale_or_missing_channel_monitor(ps_channel) { + let mut keys = self.signer_provider.derive_channel_signer( + ps_channel.channel_value_stoshis, + ps_channel.channel_keys_id, + ); + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys().clone(), + is_outbound_from_holder: true, + holder_selected_contest_delay: 66, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: ChannelPublicKeys { + funding_pubkey: PublicKey::from_secret_key( + &self.secp_ctx, + &SecretKey::from_slice(&[44; 32]).unwrap(), + ), + revocation_basepoint: RevocationBasepoint::from( + PublicKey::from_secret_key( + &self.secp_ctx, + &SecretKey::from_slice(&[45; 32]).unwrap(), + ), + ), + payment_point: PublicKey::from_secret_key( + &self.secp_ctx, + &SecretKey::from_slice(&[46; 32]).unwrap(), + ), + delayed_payment_basepoint: ps_channel + .counterparty_delayed_payment_base_key, + htlc_basepoint: ps_channel.counterparty_htlc_base_key, + }, + selected_contest_delay: ps_channel.on_counterparty_tx_csv, + }), + funding_outpoint: Some(ps_channel.funding_outpoint), + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + }; + keys.provide_channel_parameters(&channel_parameters); + let pubkeys = keys.pubkeys().clone(); + let funding_redeemscript = + make_funding_redeemscript(&pubkeys.funding_pubkey, &counterparty_node_id); + let funding_txo_script = funding_redeemscript.to_p2wsh(); + let destination_script = self + .signer_provider + .get_destination_script(ps_channel.channel_keys_id) + .unwrap(); + let monitor = ChannelMonitor::new_stub( + self.secp_ctx.clone(), + ps_channel, + keys, + channel_parameters, + funding_txo_script, + destination_script, + ); + + self.watch_dummy(monitor); + log_debug!( + logger, + "Generating BogusChannelReestablish to force close the channel." + ); + + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + pending_msg_events.push(MessageSendEvent::SendChannelReestablish { + node_id: counterparty_node_id, + msg: msgs::ChannelReestablish { + channel_id: ps_channel.channel_id, + next_local_commitment_number: 0, + next_remote_commitment_number: 0, + your_last_per_commitment_secret: [1u8; 32], + my_current_per_commitment_point: PublicKey::from_slice(&[2u8; 33]) + .unwrap(), + next_funding_txid: None, + }, + }) + } + } + } + } + + fn peer_disconnected(&self, _their_node_id: PublicKey) {} + + fn peer_connected( + &self, counterparty_node_id: PublicKey, init_msg: &msgs::Init, _inbound: bool, + ) -> Result<(), ()> { + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), None, None); + + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + e.insert(Mutex::new(PeerState::new(&init_msg.features))); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, + } + } + + log_debug!(logger, "Connected to node {}", log_pubkey!(counterparty_node_id)); + Ok(()) + } + + fn handle_channel_reestablish( + &self, _their_node_id: PublicKey, _msg: &msgs::ChannelReestablish, + ) { + } + fn handle_error(&self, _their_node_id: PublicKey, _msg: &msgs::ErrorMessage) {} + fn provided_node_features(&self) -> NodeFeatures { + provided_node_features(&self.default_configuration) + } + fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures { + provided_init_features(&self.default_configuration) + } + fn get_chain_hashes(&self) -> Option> { + Some(vec![self.chain_hash]) + } + + fn message_received(&self) {} +} + +impl< + ChannelSigner: EcdsaChannelSigner, + C: Deref, + SP: Deref, + L: Deref, + NS: Deref, + ES: Deref, + P: Deref, + T: Deref, + F: Deref, + > chain::Confirm for FundRecoverer +where + SP::Target: SignerProvider, + NS::Target: NodeSigner, + L::Target: Logger, + ES::Target: EntropySource, + C::Target: chain::Filter, + P::Target: Persist, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, +{ + fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) { + log_debug!( + self.logger, + "{} provided transactions confirmed at height {} in block {}", + txdata.len(), + height, + header.block_hash() + ); + + self.process_chain_data(header, None, txdata, |monitor, txdata| { + monitor.transactions_confirmed( + header, + txdata, + height, + &*self.broadcaster, + &*self.fee_estimator, + &self.logger, + ) + }); + // Assume we may have some new events and wake the event processor + self.event_notifier.notify(); + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + monitor_state.monitor.transaction_unconfirmed( + txid, + &*self.broadcaster, + &*self.fee_estimator, + &self.logger, + ); + } + } + + fn best_block_updated(&self, header: &Header, height: u32) { + log_debug!( + self.logger, + "New best block {} at height {} provided via best_block_updated", + header.block_hash(), + height + ); + self.process_chain_data(header, Some(height), &[], |monitor, txdata| { + // While in practice there shouldn't be any recursive calls when given empty txdata, + // it's still possible if a chain::Filter implementation returns a transaction. + debug_assert!(txdata.is_empty()); + monitor.best_block_updated( + header, + height, + &*self.broadcaster, + &*self.fee_estimator, + &self.logger, + ) + }); + // Assume we may have some new events and wake the event processor + self.event_notifier.notify(); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { + let mut txids = Vec::new(); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + txids.append(&mut monitor_state.monitor.get_relevant_txids()); + } + + txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1))); + txids.dedup_by_key(|(txid, _, _)| *txid); + txids + } +} diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index 2e986579c63..337d9138e94 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -30,8 +30,8 @@ pub mod chan_utils; pub mod features; pub mod script; pub mod types; -pub mod fundrecoverer; pub mod our_peer_storage; +pub mod fundrecoverer; // TODO: These modules were moved from lightning-invoice and need to be better integrated into this // crate now: diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index fb0d3511a24..25740babea2 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -423,10 +423,6 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { return self.chain_monitor.release_pending_monitor_events(); } - - fn get_stub_cids_with_counterparty(&self, counterparty_node_id: PublicKey) -> Vec { - return self.chain_monitor.get_stub_cids_with_counterparty(counterparty_node_id); - } } #[cfg(test)] From da9085d0d214015116efa4926e83c6d1437167d5 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sun, 14 Jul 2024 03:49:48 +0530 Subject: [PATCH 10/11] functional_tests: Add test_peer_storage to confirm if we recover from peer storage and sweep funds correctly. --- lightning/src/ln/functional_test_utils.rs | 23 ++++ lightning/src/ln/functional_tests.rs | 152 +++++++++++++++++++++- lightning/src/util/test_utils.rs | 4 + 3 files changed, 175 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index f08697cae64..a8796c80f28 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1012,6 +1012,29 @@ macro_rules! get_channel_type_features { } } +/// Returns a Stub Channel Monitor given a channel Id, making some naive assumptions +#[macro_export] +macro_rules! get_stub { + ($node: expr, $channel_id: expr) => { + { + use bitcoin::hashes::Hash; + let mut stub = None; + // Assume funding vout is either 0 or 1 blindly + for index in 0..2 { + if let Ok(mon) = $node.chain_monitor.chain_monitor.get_stub_monitor( + $crate::chain::transaction::OutPoint{ + txid: bitcoin::Txid::from_slice(&$channel_id.0[..]).unwrap(), index + }) + { + stub = Some(mon); + break; + } + } + stub.unwrap() + } + } +} + /// Returns a channel monitor given a channel id, making some naive assumptions #[macro_export] macro_rules! get_monitor { diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 31346c6b78b..1bc9b35049a 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -14,17 +14,20 @@ use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch}; use crate::chain::chaininterface::LowerBoundedFeeEstimator; -use crate::chain::channelmonitor; +use crate::chain::chainmonitor::Persist; +use crate::chain::{channelmonitor, BestBlock}; use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, OutputSpender, SignerProvider}; -use crate::events::{Event, FundingInfo, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, ClosureReason, HTLCDestination, PaymentFailureReason}; +use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, OutputSpender, SignerProvider, SpendableOutputDescriptor}; +use crate::events::{Event, FundingInfo, RecoveryEvent, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, ClosureReason, HTLCDestination, PaymentFailureReason}; use crate::ln::types::{ChannelId, PaymentPreimage, PaymentSecret, PaymentHash}; use crate::ln::channel::{CONCURRENT_INBOUND_HTLC_FEE_BUFFER, FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE, MIN_AFFORDABLE_HTLC_COUNT, get_holder_selected_channel_reserve_satoshis, OutboundV1Channel, InboundV1Channel, COINBASE_MATURITY, ChannelPhase}; -use crate::ln::channelmanager::{self, PaymentId, RAACommitmentOrder, PaymentSendFailure, RecipientOnionFields, BREAKDOWN_TIMEOUT, ENABLE_GOSSIP_TICKS, DISABLE_GOSSIP_TICKS, MIN_CLTV_EXPIRY_DELTA}; + +use crate::ln::channelmanager::{self, ChainParameters, PaymentId, RAACommitmentOrder, PaymentSendFailure, RecipientOnionFields, BREAKDOWN_TIMEOUT, ENABLE_GOSSIP_TICKS, DISABLE_GOSSIP_TICKS, MIN_CLTV_EXPIRY_DELTA}; use crate::ln::channel::{DISCONNECT_PEER_AWAITING_RESPONSE_TICKS, ChannelError}; use crate::ln::{chan_utils, onion_utils}; use crate::ln::chan_utils::{commitment_tx_base_weight, COMMITMENT_TX_WEIGHT_PER_HTLC, OFFERED_HTLC_SCRIPT_WEIGHT, htlc_success_tx_weight, htlc_timeout_tx_weight, HTLCOutputInCommitment}; +use crate::ln::fundrecoverer::FundRecoverer; use crate::routing::gossip::{NetworkGraph, NetworkUpdate}; use crate::routing::router::{Path, PaymentParameters, Route, RouteHop, get_route, RouteParameters}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, NodeFeatures}; @@ -174,6 +177,147 @@ fn test_funding_exceeds_no_wumbo_limit() { } } +#[test] +fn test_peer_storage() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let (persister, chain_monitor); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes_0_deserialized; + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let nodes_0_serialized = nodes[0].node.encode(); + + let (_a, _b, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1); + + send_payment(&nodes[0], &vec!(&nodes[1])[..], 1000); + send_payment(&nodes[0], &vec!(&nodes[1])[..], 10000); + send_payment(&nodes[0], &vec!(&nodes[1])[..], 9999); + + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); + + // Reconnect peers + nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { + features: nodes[1].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]); + assert_eq!(reestablish_1.len(), 1); + nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); + let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]); + assert_eq!(reestablish_2.len(), 1); + + nodes[0].node.handle_channel_reestablish(nodes[1].node.get_our_node_id(), &reestablish_2[0]); + handle_chan_reestablish_msgs!(nodes[0], nodes[1]); + nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), &reestablish_1[0]); + handle_chan_reestablish_msgs!(nodes[1], nodes[0]); + + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); + + // // Lets drop the monitor and clear the chain_monitor as well. + nodes[0].chain_source.clear_watched_txn_and_outputs(); + reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[], persister, chain_monitor, nodes_0_deserialized); + let persister: &dyn Persist = &chanmon_cfgs[0].persister; + + let fundrecoverer + = FundRecoverer::new(node_cfgs[0].keys_manager, node_cfgs[0].logger,test_default_channel_config(), ChainParameters {network: Network::Testnet, + best_block: BestBlock::from_network(Network::Testnet)}, node_cfgs[0].keys_manager, node_cfgs[0].keys_manager, Some(&chanmon_cfgs[0].chain_source), + persister, node_cfgs[0].fee_estimator, node_cfgs[0].tx_broadcaster, Vec::new()); + + fundrecoverer.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + + nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + // 0th - SendYourPeerStorageMessage + // 1st - SendPeerStorage + // 2nd - SendChannelReestablish + assert_eq!(msg_events.len(), 3); + + for msg in msg_events { + if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { + fundrecoverer.handle_channel_reestablish(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else if let MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } = msg { + fundrecoverer.handle_peer_storage(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else if let MessageSendEvent::SendYourPeerStorageMessage { ref node_id, ref msg } = msg { + fundrecoverer.handle_your_peer_storage(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else { + panic!("Unexpected event") + } + } + + let bogus_chan_reestablish = fundrecoverer.get_and_clear_pending_msg_events(); + assert_eq!(bogus_chan_reestablish.len(), 1); + + match bogus_chan_reestablish[0] { + MessageSendEvent::SendChannelReestablish {ref node_id, ref msg} => { + assert_eq!(nodes[1].node.get_our_node_id(), *node_id); + nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg); + }, + _ => panic!("Unexpected event"), + } + + let commitment_tx = { + let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert_eq!(node_txn.len(), 1); + node_txn.remove(0) + }; + + let block = create_dummy_block(nodes[1].best_block_hash(), 42, vec![commitment_tx.clone()]); + connect_block(&nodes[1], &block); + // Since we are using fundrecoverer as Chain::watch. + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + let height = nodes[0].best_block_info().1 + 1; + fundrecoverer.best_block_updated(&block.header, height); + fundrecoverer.transactions_confirmed(&block.header, &txdata, height); + + check_closed_broadcast!(nodes[1], true); + + let events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match events[0] { + Event::ChannelClosed {..} => {}, // If we actually processed we'd receive the payment + _ => panic!("Unexpected event"), + } + let mut dummy_block = create_dummy_block(nodes[1].best_block_hash(), height, Vec::new()); + for i in 1..CHAN_CONFIRM_DEPTH { + let prev_blockhash = dummy_block.header.block_hash(); + let dummy_txdata: Vec<_> = dummy_block.txdata.iter().enumerate().collect(); + fundrecoverer.best_block_updated(&dummy_block.header, height + i + 1); + fundrecoverer.transactions_confirmed(&dummy_block.header, &dummy_txdata, height + i + 1); + dummy_block = create_dummy_block(prev_blockhash, height + i + 1, Vec::new()); + } + + // Clearing chain source so that the `drop` doesn't panic. + nodes[0].chain_source.clear_watched_txn_and_outputs(); + + check_added_monitors!(nodes[1], 1); + + for event in fundrecoverer.get_and_clear_pending_events() { + match event { + Event::SpendableOutputs { mut outputs, channel_id: _ } => { + for outp in outputs.drain(..) { + match outp { + SpendableOutputDescriptor::StaticPaymentOutput(static_payment) => { + assert_eq!(static_payment.output.value.to_sat(), commitment_tx.output[0].value.to_sat()); + }, + _ => panic!("Unexpected event"), + } + } + }, + _ => panic!("Unexpected event"), + }; + } +} + fn do_test_counterparty_no_reserve(send_from_initiator: bool) { // A peer providing a channel_reserve_satoshis of 0 (or less than our dust limit) is insecure, // but only for them. Because some LSPs do it with some level of trust of the clients (for a diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 25740babea2..55062c663d3 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -1431,6 +1431,10 @@ impl TestChainSource { self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone())); self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey)); } + pub fn clear_watched_txn_and_outputs(&self) { + self.watched_outputs.lock().unwrap().clear(); + self.watched_txn.lock().unwrap().clear(); + } } impl UtxoLookup for TestChainSource { From fe6e83cf0a28dd1bea6480d3539715c73978b18f Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Wed, 17 Jul 2024 12:45:05 +0530 Subject: [PATCH 11/11] functional_tests: Add test to check if the node is recovering funds from a revoked state. --- lightning/src/ln/functional_tests.rs | 144 +++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 1bc9b35049a..fc606d476c0 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -4685,6 +4685,150 @@ macro_rules! check_spendable_outputs { } } +#[test] +fn test_peer_storage_on_revoked_txn() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let (persister, chain_monitor); + + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes_0_deserialized; + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let nodes_0_serialized = nodes[0].node.encode(); + + let (_a, _b, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 1, 0); + + send_payment(&nodes[1], &vec!(&nodes[0])[..], 10000); + send_payment(&nodes[1], &vec!(&nodes[0])[..], 10000); + + let revoked_local_txn = get_local_commitment_txn!(nodes[1], channel_id); + assert_eq!(revoked_local_txn[0].input.len(), 1); + assert_eq!(revoked_local_txn[0].input[0].previous_output.txid, funding_tx.txid()); + + send_payment(&nodes[1], &vec!(&nodes[0])[..], 10000); + + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); + + // // Lets drop the monitor and clear the chain_monitor as well. + nodes[0].chain_source.clear_watched_txn_and_outputs(); + reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[], persister, chain_monitor, nodes_0_deserialized); + let persister: &dyn Persist = &chanmon_cfgs[0].persister; + + let fundrecoverer + = FundRecoverer::new(node_cfgs[0].keys_manager, node_cfgs[0].logger,test_default_channel_config(), ChainParameters {network: Network::Testnet, + best_block: BestBlock::from_network(Network::Testnet)}, node_cfgs[0].keys_manager, node_cfgs[0].keys_manager, Some(&chanmon_cfgs[0].chain_source), + persister, node_cfgs[0].fee_estimator, node_cfgs[0].tx_broadcaster, Vec::new()); + + fundrecoverer.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + + nodes[1].node.peer_connected(nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + // 0th - SendYourPeerStorageMessage + // 1st - SendPeerStorage + // 2nd - SendChannelReestablish + assert_eq!(msg_events.len(), 3); + + for msg in msg_events { + if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { + fundrecoverer.handle_channel_reestablish(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else if let MessageSendEvent::SendPeerStorageMessage { ref node_id, ref msg } = msg { + fundrecoverer.handle_peer_storage(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else if let MessageSendEvent::SendYourPeerStorageMessage { ref node_id, ref msg } = msg { + fundrecoverer.handle_your_peer_storage(nodes[1].node.get_our_node_id(), msg); + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + } else { + panic!("Unexpected event") + } + } + + let recovery_event = fundrecoverer.get_and_clear_recovery_pending_events(); + assert_eq!(recovery_event.len(), 1); + match recovery_event[0] { + RecoveryEvent::RescanBlock{..} => {}, + _ => panic!("Unexpected event"), + }; + + let bogus_chan_reestablish = fundrecoverer.get_and_clear_pending_msg_events(); + assert_eq!(bogus_chan_reestablish.len(), 1); + + match bogus_chan_reestablish[0] { + MessageSendEvent::SendChannelReestablish {ref node_id, ref msg} => { + assert_eq!(nodes[1].node.get_our_node_id(), *node_id); + nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg); + }, + _ => panic!("Unexpected event"), + } + + + + let block = create_dummy_block(nodes[0].best_block_hash(), 42, vec![revoked_local_txn[0].clone()]); + connect_block(&nodes[1], &block); + // Since we are using fundrecoverer as Chain::watch. + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + let height = nodes[0].best_block_info().1 + 1; + + nodes[0].blocks.lock().unwrap().push((block.clone(), height)); + fundrecoverer.best_block_updated(&block.header, height); + fundrecoverer.transactions_confirmed(&block.header, &txdata, height); + + check_closed_broadcast!(nodes[1], true); + + let events_2 = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(events_2.len(), 1); + match events_2[0] { + Event::ChannelClosed {..} => {}, // If we actually processed we'd receive the payment + _ => panic!("Unexpected event"), + } + check_added_monitors!(nodes[1], 1); + + let panelty = node_cfgs[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); + assert_eq!(panelty.len(), 1); + assert_eq!(panelty[0].input.len(), 1); + + let block = create_dummy_block(nodes[1].best_block_hash(), 42, vec![panelty[0].clone()]); + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + connect_block(&nodes[1], &block); + + nodes[0].blocks.lock().unwrap().push((block.clone(), height + 1)); + fundrecoverer.best_block_updated(&block.header, height + 1); + fundrecoverer.transactions_confirmed(&block.header, &txdata, height + 1); + + let mut dummy_block = create_dummy_block(nodes[1].best_block_hash(), height, Vec::new()); + for i in 1..CHAN_CONFIRM_DEPTH { + let prev_blockhash = dummy_block.header.block_hash(); + let dummy_txdata: Vec<_> = dummy_block.txdata.iter().enumerate().collect(); + fundrecoverer.best_block_updated(&dummy_block.header, height + i + 1); + fundrecoverer.transactions_confirmed(&dummy_block.header, &dummy_txdata, height + i + 1); + dummy_block = create_dummy_block(prev_blockhash, height + i + 1, Vec::new()); + } + + // Lets drop the monitor and clear the chain_monitor as well. + nodes[0].chain_source.clear_watched_txn_and_outputs(); + + for event in fundrecoverer.get_and_clear_pending_events() { + match event { + Event::SpendableOutputs { mut outputs, channel_id: _ } => { + for outp in outputs.drain(..) { + match outp { + SpendableOutputDescriptor::StaticOutput{output, ..} => { + assert_eq!(output.value.to_sat(), panelty[0].output[0].value.to_sat()); + }, + _ => panic!("Unexpected event"), + } + } + }, + _ => panic!("Unexpected event"), + }; + } +} + #[test] fn test_claim_sizeable_push_msat() { // Incidentally test SpendableOutput event generation due to detection of to_local output on commitment tx