From 5d6e258061638eb62da13ffe7549f1ac866d01df Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 23 Oct 2023 10:26:00 +0200 Subject: [PATCH] Implement `ElectrumSyncClient` --- lightning-transaction-sync/Cargo.toml | 5 +- lightning-transaction-sync/src/common.rs | 1 + lightning-transaction-sync/src/electrum.rs | 402 +++++++++++++++++++++ lightning-transaction-sync/src/error.rs | 22 +- lightning-transaction-sync/src/lib.rs | 10 +- 5 files changed, 430 insertions(+), 10 deletions(-) create mode 100644 lightning-transaction-sync/src/electrum.rs diff --git a/lightning-transaction-sync/Cargo.toml b/lightning-transaction-sync/Cargo.toml index 92513f0c850..4af24b60f93 100644 --- a/lightning-transaction-sync/Cargo.toml +++ b/lightning-transaction-sync/Cargo.toml @@ -18,6 +18,7 @@ default = [] esplora-async = ["async-interface", "esplora-client/async", "futures"] esplora-async-https = ["esplora-async", "reqwest/rustls-tls"] esplora-blocking = ["esplora-client/blocking"] +electrum = ["electrum-client"] async-interface = [] [dependencies] @@ -27,9 +28,9 @@ bdk-macros = "0.6" futures = { version = "0.3", optional = true } esplora-client = { version = "0.4", default-features = false, optional = true } reqwest = { version = "0.11", optional = true, default-features = false, features = ["json"] } +electrum-client = { version = "0.14.1", optional = true } [dev-dependencies] lightning = { version = "0.0.118", path = "../lightning", features = ["std", "_test_utils"] } -electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] } -electrum-client = "0.12.0" +electrsd = { version = "0.23.3", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] } tokio = { version = "1.14.0", features = ["full"] } diff --git a/lightning-transaction-sync/src/common.rs b/lightning-transaction-sync/src/common.rs index a6ee61e90f2..aebddae6eb5 100644 --- a/lightning-transaction-sync/src/common.rs +++ b/lightning-transaction-sync/src/common.rs @@ -67,6 +67,7 @@ impl FilterQueue { } } +#[derive(Debug)] pub(crate) struct ConfirmedTx { pub tx: Transaction, pub block_header: BlockHeader, diff --git a/lightning-transaction-sync/src/electrum.rs b/lightning-transaction-sync/src/electrum.rs new file mode 100644 index 00000000000..aca23841462 --- /dev/null +++ b/lightning-transaction-sync/src/electrum.rs @@ -0,0 +1,402 @@ +use crate::common::{ConfirmedTx, SyncState, FilterQueue}; +use crate::error::{TxSyncError, InternalError}; + +use electrum_client::Client as ElectrumClient; +use electrum_client::ElectrumApi; + +use lightning::util::logger::Logger; +use lightning::{log_error, log_debug, log_trace}; +use lightning::chain::WatchedOutput; +use lightning::chain::{Confirm, Filter}; + +use bitcoin::{BlockHash, BlockHeader, Script, Txid}; + +use std::ops::Deref; +use std::sync::Mutex; +use std::collections::HashSet; +use std::time::Instant; + +/// Synchronizes LDK with a given Electrum server. +/// +/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of +/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and +/// reconfirmation. +/// +/// Note that registration via [`Filter`] needs to happen before any calls to +/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor. +/// +/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel +/// [`Filter`]: lightning::chain::Filter +pub struct ElectrumSyncClient +where + L::Target: Logger, +{ + sync_state: Mutex, + queue: Mutex, + client: ElectrumClient, + logger: L, +} + +impl ElectrumSyncClient +where + L::Target: Logger, +{ + /// Returns a new [`ElectrumSyncClient`] object. + pub fn new(server_url: String, logger: L) -> Result { + let client = ElectrumClient::new(&server_url).map_err(|e| { + log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e); + e + })?; + + Self::from_client(client, logger) + } + + /// Returns a new [`ElectrumSyncClient`] object using the given Electrum client. + pub fn from_client(client: ElectrumClient, logger: L) -> Result { + let sync_state = Mutex::new(SyncState::new()); + let queue = Mutex::new(FilterQueue::new()); + + Ok(Self { + sync_state, + queue, + client, + logger, + }) + } + + /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This + /// method should be called regularly to keep LDK up-to-date with current chain data. + /// + /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the + /// newest on-chain activity related to the items previously registered via the [`Filter`] + /// interface. + /// + /// [`Confirm`]: lightning::chain::Confirm + /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`Filter`]: lightning::chain::Filter + pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> { + // This lock makes sure we're syncing once at a time. + let mut sync_state = self.sync_state.lock().unwrap(); + + log_trace!(self.logger, "Starting transaction sync."); + let start_time = Instant::now(); + let mut num_confirmed = 0; + let mut num_unconfirmed = 0; + + let tip_notification = self.client.block_headers_subscribe()?; + let mut tip_header = tip_notification.header; + let mut tip_height = tip_notification.height as u32; + + loop { + let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state); + let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash; + + // We loop until any registered transactions have been processed at least once, or the + // tip hasn't been updated during the last iteration. + if !sync_state.pending_sync && !pending_registrations && !tip_is_new { + // Nothing to do. + break; + } else { + // Update the known tip to the newest one. + if tip_is_new { + // First check for any unconfirmed transactions and act on it immediately. + match self.get_unconfirmed_transactions(&confirmables) { + Ok(unconfirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + if let Some(notification) = self.client.block_headers_pop()? { + let check_tip_hash = notification.header.block_hash(); + if check_tip_hash != tip_header.block_hash() { + tip_header = notification.header; + tip_height = notification.height as u32; + continue; + } + } + + num_unconfirmed += unconfirmed_txs.len(); + self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs); + self.sync_best_block_updated(&confirmables, &tip_header, tip_height); + }, + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + } + + match self.get_confirmed_transactions(&sync_state) { + Ok(confirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + if let Some(notification) = self.client.block_headers_pop()? { + let check_tip_hash = notification.header.block_hash(); + if check_tip_hash != tip_header.block_hash() { + tip_header = notification.header; + tip_height = notification.height as u32; + continue; + } + } + + num_confirmed += confirmed_txs.len(); + self.sync_confirmed_transactions( + &mut sync_state, + &confirmables, + confirmed_txs, + ); + } + Err(InternalError::Inconsistency) => { + // Immediately restart syncing when we encounter any inconsistencies. + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + sync_state.last_sync_hash = Some(tip_header.block_hash()); + sync_state.pending_sync = false; + } + } + log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.", + tip_header.block_hash(), start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed); + Ok(()) + } + + fn sync_unconfirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec, + ) { + for txid in unconfirmed_txs { + for c in confirmables { + c.transaction_unconfirmed(&txid); + } + + sync_state.watched_transactions.insert(txid); + } + } + + + fn sync_best_block_updated( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_header: &BlockHeader, + tip_height: u32, + ) { + for c in confirmables { + c.best_block_updated(&tip_header, tip_height); + } + } + + fn sync_confirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, + confirmed_txs: Vec + ) { + for ctx in confirmed_txs { + for c in confirmables { + c.transactions_confirmed( + &ctx.block_header, + &[(ctx.pos, &ctx.tx)], + ctx.block_height, + ); + } + + sync_state.watched_transactions.remove(&ctx.tx.txid()); + + for input in &ctx.tx.input { + sync_state.watched_outputs.remove(&input.previous_output); + } + } + } + + fn get_confirmed_transactions( + &self, sync_state: &SyncState, + ) -> Result, InternalError> { + + // First, check the confirmation status of registered transactions as well as the + // status of dependent transactions of registered outputs. + let mut confirmed_txs = Vec::new(); + let mut watched_script_pubkeys = Vec::new(); + let mut watched_txs = Vec::new(); + let watched_outputs = sync_state.watched_outputs.values().collect::>(); + + for txid in &sync_state.watched_transactions { + if let Ok(tx) = self.client.transaction_get(&txid) { + watched_txs.push((txid, tx.clone())); + if let Some(tx_out) = tx.output.first() { + watched_script_pubkeys.push(tx_out.script_pubkey.clone()); + } else { + debug_assert!(false, "Failed due to retrieving invalid tx data."); + log_trace!(self.logger, "Failed due to retrieving invalid tx data."); + return Err(InternalError::Failed); + } + } + } + + let num_tx_lookups = watched_script_pubkeys.len(); + debug_assert_eq!(num_tx_lookups, watched_txs.len()); + + for output in &watched_outputs { + watched_script_pubkeys.push(output.script_pubkey.clone()); + } + + let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups; + debug_assert_eq!(num_output_spend_lookups, watched_outputs.len()); + + match self.client.batch_script_get_history(&watched_script_pubkeys) { + Ok(mut tx_results) => { + let output_results = tx_results.split_off(num_tx_lookups); + debug_assert_eq!(num_output_spend_lookups, output_results.len()); + for (i, script_history) in tx_results.iter().enumerate() { + let (txid, tx) = &watched_txs[i]; + let filtered_history: Vec<_> = script_history.iter().filter(|h| h.tx_hash == **txid).collect(); + debug_assert_eq!(filtered_history.len(), 1); + if let Some(filtered) = filtered_history.first() + { + let prob_conf_height = filtered.height as u32; + match self.client.transaction_get_merkle(txid, prob_conf_height as usize) { + Ok(merkle_res) => { + debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32); + match self.client.block_header(prob_conf_height as usize) { + Ok(block_header) => { + // TODO can we check the merkle proof here to be sure? + confirmed_txs.push(ConfirmedTx { tx: tx.clone(), block_header, block_height: prob_conf_height, pos: merkle_res.pos }); + } + Err(e) => { + log_trace!(self.logger, "Failed to retrieve block header for height {}: {}.", prob_conf_height, e); + return Err(InternalError::Failed); + } + } + } + Err(e) => { + log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing: {}", txid, e); + return Err(InternalError::Inconsistency); + } + } + } + } + + for (i, script_history) in output_results.iter().enumerate() { + for possible_output_spend in script_history { + if possible_output_spend.height > 0 { + let txid = possible_output_spend.tx_hash; + match self.client.transaction_get(&txid) { + Ok(tx) => { + let mut is_spend = false; + for txin in &tx.input { + if txin.previous_output == watched_outputs[i].outpoint.into_bitcoin_outpoint() { + is_spend = true; + break; + } + } + + if is_spend { + let prob_conf_height = possible_output_spend.height as u32; + match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) { + Ok(merkle_res) => { + debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32); + match self.client.block_header(prob_conf_height as usize) { + Ok(block_header) => { + // TODO can we check the merkle proof here to be sure? + confirmed_txs.push(ConfirmedTx { tx: tx.clone(), block_header, block_height: prob_conf_height, pos: merkle_res.pos }); + } + Err(e) => { + log_trace!(self.logger, "Failed to retrieve block header for height {}: {}.", prob_conf_height, e); + return Err(InternalError::Failed); + } + } + } + Err(e) => { + log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing: {}", txid, e); + return Err(InternalError::Inconsistency); + } + } + } + } + Err(e) => { + log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing: {}", txid, e); + return Err(InternalError::Inconsistency); + } + } + } + } + } + } + Err(e) => { + log_trace!(self.logger, "Failed to script histories: {}.", e); + return Err(InternalError::Failed); + } + } + + // Sort all confirmed transactions first by block height, then by in-block + // position, and finally feed them to the interface in order. + confirmed_txs.sort_unstable_by(|tx1, tx2| { + tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos)) + }); + + Ok(confirmed_txs) + } + + fn get_unconfirmed_transactions( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, + ) -> Result, InternalError> { + // Query the interface for relevant txids and check whether the relevant blocks are still + // in the best chain, mark them unconfirmed otherwise + let relevant_txids = confirmables + .iter() + .flat_map(|c| c.get_relevant_txids()) + .collect::)>>(); + + let mut unconfirmed_txs = Vec::new(); + + for (txid, block_hash_opt) in relevant_txids { + if let Some(block_hash) = block_hash_opt { + if let Ok(tx) = self.client.transaction_get(&txid) { + if let Some(tx_out) = tx.output.first() { + let script_history = self.client.script_get_history(&tx_out.script_pubkey)?; + if let Some(filtered) = script_history.iter() + .filter(|h| h.tx_hash == txid).max_by_key(|x| x.height) + { + let prob_conf_height = filtered.height; + let block_header = self.client.block_header(prob_conf_height as usize)?; + if block_header.block_hash() == block_hash { + // Skip if the tx is still confirmed in the block in question. + continue; + } + } + } + } + unconfirmed_txs.push(txid); + } else { + log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + } + } + Ok(unconfirmed_txs) + } + + /// Returns a reference to the underlying Electrum client. + pub fn client(&self) -> &ElectrumClient { + &self.client + } + +} + +impl Filter for ElectrumSyncClient +where + L::Target: Logger, +{ + fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.transactions.insert(*txid); + } + + fn register_output(&self, output: WatchedOutput) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output); + } +} diff --git a/lightning-transaction-sync/src/error.rs b/lightning-transaction-sync/src/error.rs index 73d9de70169..d1f4a319e86 100644 --- a/lightning-transaction-sync/src/error.rs +++ b/lightning-transaction-sync/src/error.rs @@ -18,7 +18,6 @@ impl fmt::Display for TxSyncError { } #[derive(Debug)] -#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] pub(crate) enum InternalError { /// A transaction sync failed and needs to be retried eventually. Failed, @@ -26,7 +25,6 @@ pub(crate) enum InternalError { Inconsistency, } -#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] impl fmt::Display for InternalError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { @@ -38,9 +36,14 @@ impl fmt::Display for InternalError { } } -#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] impl std::error::Error for InternalError {} +impl From for TxSyncError { + fn from(_e: InternalError) -> Self { + Self::Failed + } +} + #[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] impl From for TxSyncError { fn from(_e: esplora_client::Error) -> Self { @@ -55,9 +58,16 @@ impl From for InternalError { } } -#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] -impl From for TxSyncError { - fn from(_e: InternalError) -> Self { +#[cfg(feature = "electrum")] +impl From for InternalError { + fn from(_e: electrum_client::Error) -> Self { + Self::Failed + } +} + +#[cfg(feature = "electrum")] +impl From for TxSyncError { + fn from(_e: electrum_client::Error) -> Self { Self::Failed } } diff --git a/lightning-transaction-sync/src/lib.rs b/lightning-transaction-sync/src/lib.rs index ca3ce3f8ad6..21b6a4e97c1 100644 --- a/lightning-transaction-sync/src/lib.rs +++ b/lightning-transaction-sync/src/lib.rs @@ -74,11 +74,17 @@ extern crate bdk_macros; #[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] mod esplora; -#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] -mod common; +#[cfg(any(feature = "electrum"))] +mod electrum; +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum"))] +mod common; +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum"))] mod error; +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async", feature = "electrum"))] pub use error::TxSyncError; #[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] pub use esplora::EsploraSyncClient; +#[cfg(feature = "electrum")] +pub use electrum::ElectrumSyncClient;