diff --git a/Cargo.lock b/Cargo.lock index 13ca03b08d..015882d3ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2179,6 +2179,7 @@ dependencies = [ "hex", "html-escaper", "http", + "hyper", "indicatif", "lazy_static", "log", diff --git a/Cargo.toml b/Cargo.toml index 945ed67866..be769a07b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ futures = "0.3.21" hex = "0.4.3" html-escaper = "0.2.0" http = "0.2.6" +hyper = { version = "0.14.24", features = ["http1", "client"] } indicatif = "0.17.1" lazy_static = "1.4.0" log = "0.4.14" diff --git a/src/index.rs b/src/index.rs index 8406e14eb6..0b050b11dc 100644 --- a/src/index.rs +++ b/src/index.rs @@ -19,6 +19,7 @@ use { }; mod entry; +mod rest; mod rtx; mod updater; diff --git a/src/index/rest.rs b/src/index/rest.rs new file mode 100644 index 0000000000..eca11e6686 --- /dev/null +++ b/src/index/rest.rs @@ -0,0 +1,41 @@ +use { + anyhow::Result, + bitcoin::{consensus::deserialize, BlockHash, Transaction, Txid}, + hyper::{client::HttpConnector, Client, Uri}, + std::str::FromStr, +}; + +pub(crate) struct Rest { + client: Client, + url: String, +} + +impl Rest { + pub(crate) fn new(url: &str) -> Self { + let url = if !url.starts_with("http://") { + "http://".to_string() + url + } else { + url.to_string() + }; + Rest { + client: Client::new(), + url, + } + } + + pub(crate) async fn get_block_hash(&self, height: u32) -> Result { + let url = format!("{}/rest/blockhashbyheight/{height}.bin", self.url); + let res = self.client.get(Uri::from_str(&url)?).await?; + let buf = hyper::body::to_bytes(res).await?; + let block_hash = deserialize(&buf)?; + Ok(block_hash) + } + + pub(crate) async fn get_raw_transaction(&self, txid: &Txid) -> Result { + let url = format!("{}/rest/tx/{txid:x}.bin", self.url); + let res = self.client.get(Uri::from_str(&url)?).await?; + let buf = hyper::body::to_bytes(res).await?; + let tx = deserialize(&buf)?; + Ok(tx) + } +} diff --git a/src/index/updater.rs b/src/index/updater.rs index 7dae185f57..d047ef77c3 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -1,4 +1,10 @@ -use {self::inscription_updater::InscriptionUpdater, super::*, std::sync::mpsc}; +use { + self::inscription_updater::InscriptionUpdater, + super::{rest::Rest, *}, + futures::future::try_join_all, + std::sync::mpsc, + tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender}, +}; mod inscription_updater; @@ -68,6 +74,42 @@ impl Updater { updater.update_index(index, wtx) } + fn spawn_tx_fetcher(rest: Rest, rt: Runtime) -> (Sender, Receiver) { + let (outpoint_sender, mut outpoint_receiver) = tokio::sync::mpsc::channel::(20_000); + let (value_sender, value_receiver) = tokio::sync::mpsc::channel::(20_000); + + std::thread::spawn(move || { + rt.block_on(async move { + loop { + let Some(outpoint) = outpoint_receiver.recv().await else { + return; + }; + let mut outpoints = vec![outpoint]; + // Default -rpcworkqueue is 16. Going past it breaks. + // This can be set higher for better sync speed if -rpcworkqueue is configured higher + const BATCH_SIZE: usize = 15; + for _ in 0..BATCH_SIZE-1 { + let Ok(outpoint) = outpoint_receiver.try_recv() else { + break; + }; + outpoints.push(outpoint); + } + let futs = outpoints.iter().map(|outpoint| rest.get_raw_transaction(&outpoint.txid)); + let Ok(txs) = try_join_all(futs).await else { + return; + }; + for (i, tx) in txs.iter().enumerate() { + let Ok(_) = value_sender.send(tx.output[usize::try_from(outpoints[i].vout).unwrap()].value).await else { + return; + }; + } + } + }) + }); + + (outpoint_sender, value_receiver) + } + fn update_index<'index>( &mut self, index: &'index Index, @@ -92,6 +134,23 @@ impl Updater { let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?; + let rest = Rest::new(index.rpc_url.split("/wallet").next().unwrap()); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let (mut outpoint_sender, mut value_receiver) = + match rt.block_on(async { rest.get_block_hash(0).await }) { + Ok(_) => { + let (a, b) = Self::spawn_tx_fetcher(rest, rt); + (Some(a), Some(b)) + } + Err(_) => { + log::warn!("Could not connect to REST endpoint, falling back to RPC"); + (None, None) + } + }; + let mut uncommitted = 0; let mut value_cache = HashMap::new(); loop { @@ -100,7 +159,14 @@ impl Updater { Err(mpsc::RecvError) => break, }; - self.index_block(index, &mut wtx, block, &mut value_cache)?; + self.index_block( + index, + &mut outpoint_sender, + &mut value_receiver, + &mut wtx, + block, + &mut value_cache, + )?; if let Some(progress_bar) = &mut progress_bar { progress_bar.inc(1); @@ -246,10 +312,46 @@ impl Updater { fn index_block( &mut self, index: &Index, + outpoint_sender: &mut Option>, + value_receiver: &mut Option>, wtx: &mut WriteTransaction, block: BlockData, value_cache: &mut HashMap, ) -> Result<()> { + if let Some(value_receiver) = value_receiver.as_mut() { + let Err(TryRecvError::Empty) = value_receiver.try_recv() else { return Err(anyhow!("Previous block did not consume all input values")); }; + } + + let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?; + + if !self.index_sats { + if let Some(outpoint_sender) = outpoint_sender.as_ref() { + let txids = block + .txdata + .iter() + .map(|(_, txid)| txid) + .collect::>(); + for (tx, _) in &block.txdata { + for input in &tx.input { + let prev_output = input.previous_output; + if prev_output.is_null() { + continue; + } + if txids.contains(&prev_output.txid) { + continue; + } + if value_cache.contains_key(&prev_output) { + continue; + } + if outpoint_to_value.get(&prev_output.store())?.is_some() { + continue; + } + outpoint_sender.blocking_send(prev_output)?; + } + } + } + } + let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?; let start = Instant::now(); @@ -279,7 +381,6 @@ impl Updater { let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?; let mut inscription_number_to_inscription_id = wtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?; - let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?; let mut sat_to_inscription_id = wtx.open_table(SAT_TO_INSCRIPTION_ID)?; let mut satpoint_to_inscription_id = wtx.open_table(SATPOINT_TO_INSCRIPTION_ID)?; let mut statistic_to_count = wtx.open_table(STATISTIC_TO_COUNT)?; @@ -293,6 +394,7 @@ impl Updater { self.height, &mut inscription_id_to_satpoint, index, + value_receiver, &mut inscription_id_to_inscription_entry, lost_sats, &mut inscription_number_to_inscription_id, diff --git a/src/index/updater/inscription_updater.rs b/src/index/updater/inscription_updater.rs index 9fe83b13ee..2c17c6d85f 100644 --- a/src/index/updater/inscription_updater.rs +++ b/src/index/updater/inscription_updater.rs @@ -16,6 +16,7 @@ pub(super) struct InscriptionUpdater<'a, 'db, 'tx> { height: u64, id_to_satpoint: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, &'static SatPointValue>, index: &'a Index, + value_receiver: &'a mut Option>, id_to_entry: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, InscriptionEntryValue>, lost_sats: u64, next_number: u64, @@ -33,6 +34,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> { height: u64, id_to_satpoint: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, &'static SatPointValue>, index: &'a Index, + value_receiver: &'a mut Option>, id_to_entry: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, InscriptionEntryValue>, lost_sats: u64, number_to_id: &'a mut Table<'db, 'tx, u64, &'static InscriptionIdValue>, @@ -54,6 +56,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> { height, id_to_satpoint, index, + value_receiver, id_to_entry, lost_sats, next_number, @@ -97,6 +100,13 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> { .remove(&tx_in.previous_output.store())? { value.value() + } else if let Some(value_receiver) = self.value_receiver.as_mut() { + value_receiver.blocking_recv().ok_or_else(|| { + anyhow!( + "failed to get transaction for {}", + tx_in.previous_output.txid + ) + })? } else { self .index