diff --git a/Cargo.toml b/Cargo.toml index b20ef222d..a5058ebc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/file_store", "crates/electrum", "crates/esplora", + "crates/bitcoind_rpc", "example-crates/example_cli", "example-crates/example_electrum", "example-crates/example_esplora", diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml new file mode 100644 index 000000000..eeb9de581 --- /dev/null +++ b/crates/bitcoind_rpc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "bdk_bitcoind_rpc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# For no-std, remember to enable the bitcoin/no-std feature +bitcoin = { version = "0.30", default-features = false } +bitcoincore-rpc = { version = "0.17" } + +[dev-dependencies] +bdk_chain = { path = "../chain", version = "0.5", features = ["serde", "miniscript"] } +bitcoind = { version = "0.33", features = ["25_0"] } +anyhow = { version = "1" } + +[features] +default = ["std"] +std = ["bitcoin/std"] +serde = ["bitcoin/serde"] diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs new file mode 100644 index 000000000..a4b28c8e8 --- /dev/null +++ b/crates/bitcoind_rpc/src/lib.rs @@ -0,0 +1,243 @@ +//! This crate is used for emitting blockchain data from the `bitcoind` RPC interface (excluding the +//! RPC wallet API). +//! +//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`]. +//! +//! To only get block updates (exclude mempool transactions), the caller can use +//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means +//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole +//! mempool. +#![warn(missing_docs)] + +use std::collections::BTreeMap; + +use bitcoin::{block::Header, Block, BlockHash, Transaction}; +pub use bitcoincore_rpc; +use bitcoincore_rpc::bitcoincore_rpc_json; + +/// A structure that emits data sourced from [`bitcoincore_rpc::Client`]. +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct Emitter<'c, C> { + client: &'c C, + start_height: u32, + + emitted_blocks: BTreeMap, + last_block: Option, + + /// The latest first-seen epoch of emitted mempool transactions. This is used to determine + /// whether a mempool transaction is already emitted. + last_mempool_time: usize, + + /// The last emitted block during our last mempool emission. This is used to determine whether + /// there has been a reorg since our last mempool emission. + last_mempool_tip: Option<(u32, BlockHash)>, +} + +impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. + /// + /// `start_height` is the block height to start emitting blocks from. + pub fn new(client: &'c C, start_height: u32) -> Self { + Self { + client, + start_height, + emitted_blocks: BTreeMap::new(), + last_block: None, + last_mempool_time: 0, + last_mempool_tip: None, + } + } + + /// Emit mempool transactions, alongside their first-seen unix timestamps. + /// + /// Ideally, this method would only emit the same transaction once. However, if the receiver + /// filters transactions based on whether it alters the output set of tracked script pubkeys, + /// there are situations where we would want to re-emit. For example, if an emitted mempool + /// transaction spends a tracked UTXO which is confirmed at height `h`, but the receiver has + /// only seen up to block of height `h-1`, we want to re-emit this transaction until the + /// receiver has seen the block at height `h`. + /// + /// In other words, we want to re-emit a transaction if we cannot guarantee it's ancestors are + /// already emitted. + pub fn mempool(&mut self) -> Result, bitcoincore_rpc::Error> { + let client = self.client; + + let prev_mempool_tip = match self.last_mempool_tip { + // use 'avoid-re-emission' logic if there is no reorg + Some((height, hash)) if self.emitted_blocks.get(&height) == Some(&hash) => height, + _ => 0, + }; + + let prev_mempool_time = self.last_mempool_time; + let mut latest_time = prev_mempool_time; + + let txs_to_emit = client + .get_raw_mempool_verbose()? + .into_iter() + .filter_map({ + let latest_time = &mut latest_time; + move |(txid, tx_entry)| -> Option> { + let tx_time = tx_entry.time as usize; + if tx_time > *latest_time { + *latest_time = tx_time; + } + + // Avoid emitting transactions that are already emitted if we can guarantee + // blocks containing ancestors are already emitted. The bitcoind rpc interface + // provides us with the block height that the tx is introduced to the mempool. + // If we have already emitted the block of height, we can assume that all + // ancestor txs have been processed by the receiver. + let is_already_emitted = tx_time <= prev_mempool_time; + let is_within_height = tx_entry.height <= prev_mempool_tip as _; + if is_already_emitted && is_within_height { + return None; + } + + let tx = match client.get_raw_transaction(&txid, None) { + Ok(tx) => tx, + // the tx is confirmed or evicted since `get_raw_mempool_verbose` + Err(err) if err.is_not_found_error() => return None, + Err(err) => return Some(Err(err)), + }; + + Some(Ok((tx, tx_time as u64))) + } + }) + .collect::, _>>()?; + + self.last_mempool_time = latest_time; + self.last_mempool_tip = self + .emitted_blocks + .iter() + .last() + .map(|(&height, &hash)| (height, hash)); + + Ok(txs_to_emit) + } + + /// Emit the next block height and header (if any). + pub fn next_header(&mut self) -> Result, bitcoincore_rpc::Error> { + poll(self, |hash| self.client.get_block_header(hash)) + } + + /// Emit the next block height and block (if any). + pub fn next_block(&mut self) -> Result, bitcoincore_rpc::Error> { + poll(self, |hash| self.client.get_block(hash)) + } +} + +enum PollResponse { + Block(bitcoincore_rpc_json::GetBlockResult), + NoMoreBlocks, + /// Fetched block is not in the best chain. + BlockNotInBestChain, + AgreementFound(bitcoincore_rpc_json::GetBlockResult), + AgreementPointNotFound, +} + +fn poll_once(emitter: &Emitter) -> Result +where + C: bitcoincore_rpc::RpcApi, +{ + let client = emitter.client; + + if let Some(last_res) = &emitter.last_block { + assert!(!emitter.emitted_blocks.is_empty()); + + let next_hash = match last_res.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + + let res = client.get_block_info(&next_hash)?; + if res.confirmations < 0 { + return Ok(PollResponse::BlockNotInBestChain); + } + return Ok(PollResponse::Block(res)); + } + + if emitter.emitted_blocks.is_empty() { + let hash = client.get_block_hash(emitter.start_height as _)?; + + let res = client.get_block_info(&hash)?; + if res.confirmations < 0 { + return Ok(PollResponse::BlockNotInBestChain); + } + return Ok(PollResponse::Block(res)); + } + + for (&_, hash) in emitter.emitted_blocks.iter().rev() { + let res = client.get_block_info(hash)?; + if res.confirmations < 0 { + // block is not in best chain + continue; + } + + // agreement point found + return Ok(PollResponse::AgreementFound(res)); + } + + Ok(PollResponse::AgreementPointNotFound) +} + +fn poll( + emitter: &mut Emitter, + get_item: F, +) -> Result, bitcoincore_rpc::Error> +where + C: bitcoincore_rpc::RpcApi, + F: Fn(&BlockHash) -> Result, +{ + loop { + match poll_once(emitter)? { + PollResponse::Block(res) => { + let height = res.height as u32; + let item = get_item(&res.hash)?; + assert_eq!(emitter.emitted_blocks.insert(height, res.hash), None); + emitter.last_block = Some(res); + return Ok(Some((height, item))); + } + PollResponse::NoMoreBlocks => { + emitter.last_block = None; + return Ok(None); + } + PollResponse::BlockNotInBestChain => { + emitter.last_block = None; + continue; + } + PollResponse::AgreementFound(res) => { + emitter.emitted_blocks.split_off(&(res.height as u32 + 1)); + emitter.last_block = Some(res); + continue; + } + PollResponse::AgreementPointNotFound => { + emitter.emitted_blocks.clear(); + emitter.last_block = None; + continue; + } + } + } +} + +/// Extends [`bitcoincore_rpc::Error`]. +pub trait BitcoindRpcErrorExt { + /// Returns whether the error is a "not found" error. + /// + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// [`Iterator::Item`]. + fn is_not_found_error(&self) -> bool; +} + +impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { + fn is_not_found_error(&self) -> bool { + if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self + { + rpc_err.code == -5 + } else { + false + } + } +}