Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use REST interface for calling get_raw_transaction #1636

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3.21"
hex = "0.4.3"
html-escaper = "0.2.0"
http = "0.2.6"
hyper = { version = "0.14.24", features = ["http1", "client"] }
indicatif = "0.17.1"
lazy_static = "1.4.0"
log = "0.4.14"
Expand Down
1 change: 1 addition & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use {
};

mod entry;
mod rest;
mod rtx;
mod updater;

Expand Down
41 changes: 41 additions & 0 deletions src/index/rest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use {
anyhow::Result,
bitcoin::{consensus::deserialize, BlockHash, Transaction, Txid},
hyper::{client::HttpConnector, Client, Uri},
std::str::FromStr,
};

pub(crate) struct Rest {
client: Client<HttpConnector>,
url: String,
}

impl Rest {
pub(crate) fn new(url: &str) -> Self {
let url = if !url.starts_with("http://") {
"http://".to_string() + url
} else {
url.to_string()
};
Rest {
client: Client::new(),
url,
}
}

pub(crate) async fn get_block_hash(&self, height: u32) -> Result<BlockHash> {
let url = format!("{}/rest/blockhashbyheight/{height}.bin", self.url);
let res = self.client.get(Uri::from_str(&url)?).await?;
let buf = hyper::body::to_bytes(res).await?;
let block_hash = deserialize(&buf)?;
Ok(block_hash)
}

pub(crate) async fn get_raw_transaction(&self, txid: &Txid) -> Result<Transaction> {
let url = format!("{}/rest/tx/{txid:x}.bin", self.url);
let res = self.client.get(Uri::from_str(&url)?).await?;
let buf = hyper::body::to_bytes(res).await?;
let tx = deserialize(&buf)?;
Ok(tx)
}
}
108 changes: 105 additions & 3 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use {self::inscription_updater::InscriptionUpdater, super::*, std::sync::mpsc};
use {
self::inscription_updater::InscriptionUpdater,
super::{rest::Rest, *},
futures::future::try_join_all,
std::sync::mpsc,
tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender},
};

mod inscription_updater;

Expand Down Expand Up @@ -68,6 +74,42 @@ impl Updater {
updater.update_index(index, wtx)
}

fn spawn_tx_fetcher(rest: Rest, rt: Runtime) -> (Sender<OutPoint>, Receiver<u64>) {
let (outpoint_sender, mut outpoint_receiver) = tokio::sync::mpsc::channel::<OutPoint>(20_000);
let (value_sender, value_receiver) = tokio::sync::mpsc::channel::<u64>(20_000);

std::thread::spawn(move || {
rt.block_on(async move {
loop {
let Some(outpoint) = outpoint_receiver.recv().await else {
return;
};
let mut outpoints = vec![outpoint];
// Default -rpcworkqueue is 16. Going past it breaks.
// This can be set higher for better sync speed if -rpcworkqueue is configured higher
const BATCH_SIZE: usize = 15;
for _ in 0..BATCH_SIZE-1 {
let Ok(outpoint) = outpoint_receiver.try_recv() else {
break;
};
outpoints.push(outpoint);
}
let futs = outpoints.iter().map(|outpoint| rest.get_raw_transaction(&outpoint.txid));
let Ok(txs) = try_join_all(futs).await else {
return;
};
for (i, tx) in txs.iter().enumerate() {
let Ok(_) = value_sender.send(tx.output[usize::try_from(outpoints[i].vout).unwrap()].value).await else {
return;
};
}
}
})
});

(outpoint_sender, value_receiver)
}

fn update_index<'index>(
&mut self,
index: &'index Index,
Expand All @@ -92,6 +134,23 @@ impl Updater {

let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?;

let rest = Rest::new(index.rpc_url.split("/wallet").next().unwrap());
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let (mut outpoint_sender, mut value_receiver) =
match rt.block_on(async { rest.get_block_hash(0).await }) {
Ok(_) => {
let (a, b) = Self::spawn_tx_fetcher(rest, rt);
(Some(a), Some(b))
}
Err(_) => {
log::warn!("Could not connect to REST endpoint, falling back to RPC");
(None, None)
}
};

let mut uncommitted = 0;
let mut value_cache = HashMap::new();
loop {
Expand All @@ -100,7 +159,14 @@ impl Updater {
Err(mpsc::RecvError) => break,
};

self.index_block(index, &mut wtx, block, &mut value_cache)?;
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
Expand Down Expand Up @@ -246,10 +312,46 @@ impl Updater {
fn index_block(
&mut self,
index: &Index,
outpoint_sender: &mut Option<Sender<OutPoint>>,
value_receiver: &mut Option<Receiver<u64>>,
wtx: &mut WriteTransaction,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
if let Some(value_receiver) = value_receiver.as_mut() {
let Err(TryRecvError::Empty) = value_receiver.try_recv() else { return Err(anyhow!("Previous block did not consume all input values")); };
}

let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

if !self.index_sats {
if let Some(outpoint_sender) = outpoint_sender.as_ref() {
let txids = block
.txdata
.iter()
.map(|(_, txid)| txid)
.collect::<HashSet<_>>();
for (tx, _) in &block.txdata {
for input in &tx.input {
let prev_output = input.previous_output;
if prev_output.is_null() {
continue;
}
if txids.contains(&prev_output.txid) {
continue;
}
if value_cache.contains_key(&prev_output) {
continue;
}
if outpoint_to_value.get(&prev_output.store())?.is_some() {
continue;
}
outpoint_sender.blocking_send(prev_output)?;
}
}
}
}

let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;

let start = Instant::now();
Expand Down Expand Up @@ -279,7 +381,6 @@ impl Updater {
let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?;
let mut inscription_number_to_inscription_id =
wtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;
let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;
let mut sat_to_inscription_id = wtx.open_table(SAT_TO_INSCRIPTION_ID)?;
let mut satpoint_to_inscription_id = wtx.open_table(SATPOINT_TO_INSCRIPTION_ID)?;
let mut statistic_to_count = wtx.open_table(STATISTIC_TO_COUNT)?;
Expand All @@ -293,6 +394,7 @@ impl Updater {
self.height,
&mut inscription_id_to_satpoint,
index,
value_receiver,
&mut inscription_id_to_inscription_entry,
lost_sats,
&mut inscription_number_to_inscription_id,
Expand Down
10 changes: 10 additions & 0 deletions src/index/updater/inscription_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(super) struct InscriptionUpdater<'a, 'db, 'tx> {
height: u64,
id_to_satpoint: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, &'static SatPointValue>,
index: &'a Index,
value_receiver: &'a mut Option<Receiver<u64>>,
id_to_entry: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, InscriptionEntryValue>,
lost_sats: u64,
next_number: u64,
Expand All @@ -33,6 +34,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
height: u64,
id_to_satpoint: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, &'static SatPointValue>,
index: &'a Index,
value_receiver: &'a mut Option<Receiver<u64>>,
id_to_entry: &'a mut Table<'db, 'tx, &'static InscriptionIdValue, InscriptionEntryValue>,
lost_sats: u64,
number_to_id: &'a mut Table<'db, 'tx, u64, &'static InscriptionIdValue>,
Expand All @@ -54,6 +56,7 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
height,
id_to_satpoint,
index,
value_receiver,
id_to_entry,
lost_sats,
next_number,
Expand Down Expand Up @@ -97,6 +100,13 @@ impl<'a, 'db, 'tx> InscriptionUpdater<'a, 'db, 'tx> {
.remove(&tx_in.previous_output.store())?
{
value.value()
} else if let Some(value_receiver) = self.value_receiver.as_mut() {
value_receiver.blocking_recv().ok_or_else(|| {
anyhow!(
"failed to get transaction for {}",
tx_in.previous_output.txid
)
})?
} else {
self
.index
Expand Down