Skip to content

Commit

Permalink
Batch tx requests and re-enable skipping transactions (#1759)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewtoth authored Feb 20, 2023
1 parent cc112e2 commit 9f46f08
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [".", "test-bitcoincore-rpc"]
anyhow = { version = "1.0.56", features = ["backtrace"] }
axum = { version = "0.6.1", features = ["headers"] }
axum-server = "0.4.0"
base64 = "0.13.1"
bech32 = "0.9.1"
bip39 = "1.0.1"
bitcoin = { version = "0.29.1", features = ["rand"] }
Expand All @@ -31,6 +32,7 @@ futures = "0.3.21"
hex = "0.4.3"
html-escaper = "0.2.0"
http = "0.2.6"
hyper = { version = "0.14.24", features = ["http1", "client"] }
indicatif = "0.17.1"
lazy_static = "1.4.0"
log = "0.4.14"
Expand Down
53 changes: 53 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use {
};

mod entry;
mod fetcher;
mod rtx;
mod updater;

Expand Down Expand Up @@ -1026,6 +1027,58 @@ mod tests {
}
}

#[test]
fn inscriptions_below_first_inscription_height_are_skipped() {
let inscription = inscription("text/plain;charset=utf-8", "hello");
let template = TransactionTemplate {
inputs: &[(1, 0, 0)],
witness: inscription.to_witness(),
..Default::default()
};

{
let context = Context::builder().build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template.clone());
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context.index.get_inscription_by_id(inscription_id).unwrap(),
Some(inscription)
);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
Some(SatPoint {
outpoint: OutPoint { txid, vout: 0 },
offset: 0,
})
);
}

{
let context = Context::builder()
.arg("--first-inscription-height=3")
.build();
context.mine_blocks(1);
let txid = context.rpc_server.broadcast_tx(template);
let inscription_id = InscriptionId::from(txid);
context.mine_blocks(1);

assert_eq!(
context
.index
.get_inscription_satpoint_by_id(inscription_id)
.unwrap(),
None,
);
}
}

#[test]
fn list_first_coinbase_transaction() {
let context = Context::builder().arg("--index-sats").build();
Expand Down
112 changes: 112 additions & 0 deletions src/index/fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use {
anyhow::{anyhow, Result},
bitcoin::{Transaction, Txid},
bitcoincore_rpc::Auth,
hyper::{client::HttpConnector, Body, Client, Method, Request, Uri},
serde::Deserialize,
serde_json::{json, Value},
};

pub(crate) struct Fetcher {
client: Client<HttpConnector>,
url: Uri,
auth: String,
}

#[derive(Deserialize, Debug)]
struct JsonResponse<T> {
result: Option<T>,
error: Option<JsonError>,
id: usize,
}

#[derive(Deserialize, Debug)]
struct JsonError {
code: i32,
message: String,
}

impl Fetcher {
pub(crate) fn new(url: &str, auth: Auth) -> Result<Self> {
if auth == Auth::None {
return Err(anyhow!("No rpc authentication provided"));
}

let client = Client::new();

let url = if url.starts_with("http://") {
url.to_string()
} else {
"http://".to_string() + url
};

let url = Uri::try_from(&url).map_err(|e| anyhow!("Invalid rpc url {url}: {e}"))?;

let (user, password) = auth.get_user_pass()?;
let auth = format!("{}:{}", user.unwrap(), password.unwrap());
let auth = format!("Basic {}", &base64::encode(auth));
Ok(Fetcher { client, url, auth })
}

pub(crate) async fn get_transactions(&self, txids: Vec<Txid>) -> Result<Vec<Transaction>> {
if txids.is_empty() {
return Ok(Vec::new());
}

let mut reqs = Vec::with_capacity(txids.len());
for (i, txid) in txids.iter().enumerate() {
let req = json!({
"jsonrpc": "2.0",
"id": i, // Use the index as id, so we can quickly sort the response
"method": "getrawtransaction",
"params": [ txid ]
});
reqs.push(req);
}

let body = Value::Array(reqs).to_string();
let req = Request::builder()
.method(Method::POST)
.uri(&self.url)
.header(hyper::header::AUTHORIZATION, &self.auth)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Body::from(body))?;

let response = self.client.request(req).await?;

let buf = hyper::body::to_bytes(response).await?;

let mut results: Vec<JsonResponse<String>> = serde_json::from_slice(&buf)?;

// Return early on any error, because we need all results to proceed
if let Some(err) = results.iter().find_map(|res| res.error.as_ref()) {
return Err(anyhow!(
"Failed to fetch raw transaction: code {} message {}",
err.code,
err.message
));
}

// Results from batched JSON-RPC requests can come back in any order, so we must sort them by id
results.sort_by(|a, b| a.id.cmp(&b.id));

let txs = results
.into_iter()
.map(|res| {
res
.result
.ok_or_else(|| anyhow!("Missing result for batched JSON-RPC response"))
.and_then(|str| {
hex::decode(str)
.map_err(|e| anyhow!("Result for batched JSON-RPC response not valid hex: {e}"))
})
.and_then(|hex| {
bitcoin::consensus::deserialize(&hex).map_err(|e| {
anyhow!("Result for batched JSON-RPC response not valid bitcoin tx: {e}")
})
})
})
.collect::<Result<Vec<Transaction>>>()?;
Ok(txs)
}
}
135 changes: 129 additions & 6 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use {self::inscription_updater::InscriptionUpdater, super::*, std::sync::mpsc};
use {
self::inscription_updater::InscriptionUpdater,
super::{fetcher::Fetcher, *},
futures::future::try_join_all,
std::sync::mpsc,
tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender},
};

mod inscription_updater;

Expand Down Expand Up @@ -92,6 +98,8 @@ impl Updater {

let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?;

let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?;

let mut uncommitted = 0;
let mut value_cache = HashMap::new();
loop {
Expand All @@ -100,7 +108,14 @@ impl Updater {
Err(mpsc::RecvError) => break,
};

self.index_block(index, &mut wtx, block, &mut value_cache)?;
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
Expand Down Expand Up @@ -168,8 +183,7 @@ impl Updater {
let client =
Client::new(&index.rpc_url, index.auth.clone()).context("failed to connect to RPC URL")?;

// NB: We temporarily always fetch transactions, to avoid expensive cache misses.
let first_inscription_height = index.first_inscription_height.min(0);
let first_inscription_height = index.first_inscription_height;

thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
Expand Down Expand Up @@ -243,13 +257,123 @@ impl Updater {
}
}

fn spawn_fetcher(index: &Index) -> Result<(Sender<OutPoint>, Receiver<u64>)> {
let fetcher = Fetcher::new(&index.rpc_url, index.auth.clone())?;

// Not sure if any block has more than 20k inputs, but none so far after first inscription block
const CHANNEL_BUFFER_SIZE: usize = 20_000;
let (outpoint_sender, mut outpoint_receiver) =
tokio::sync::mpsc::channel::<OutPoint>(CHANNEL_BUFFER_SIZE);
let (value_sender, value_receiver) = tokio::sync::mpsc::channel::<u64>(CHANNEL_BUFFER_SIZE);

// Batch 2048 missing inputs at a time. Arbitrarily chosen for now, maybe higher or lower can be faster?
// Did rudimentary benchmarks with 1024 and 4096 and time was roughly the same.
const BATCH_SIZE: usize = 2048;
// Default rpcworkqueue in bitcoind is 16, meaning more than 16 concurrent requests will be rejected.
// Since we are already requesting blocks on a separate thread, and we don't want to break if anything
// else runs a request, we keep this to 12.
const PARALLEL_REQUESTS: usize = 12;

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
loop {
let Some(outpoint) = outpoint_receiver.recv().await else {
log::debug!("Outpoint channel closed");
return;
};
// There's no try_iter on tokio::sync::mpsc::Receiver like std::sync::mpsc::Receiver.
// So we just loop until BATCH_SIZE doing try_recv until it returns None.
let mut outpoints = vec![outpoint];
for _ in 0..BATCH_SIZE-1 {
let Ok(outpoint) = outpoint_receiver.try_recv() else {
break;
};
outpoints.push(outpoint);
}
// Break outpoints into chunks for parallel requests
let chunk_size = (outpoints.len() / PARALLEL_REQUESTS) + 1;
let mut futs = Vec::with_capacity(PARALLEL_REQUESTS);
for chunk in outpoints.chunks(chunk_size) {
let txids = chunk.iter().map(|outpoint| outpoint.txid).collect();
let fut = fetcher.get_transactions(txids);
futs.push(fut);
}
let txs = match try_join_all(futs).await {
Ok(txs) => txs,
Err(e) => {
log::error!("Couldn't receive txs {e}");
return;
}
};
// Send all tx output values back in order
for (i, tx) in txs.iter().flatten().enumerate() {
let Ok(_) = value_sender.send(tx.output[usize::try_from(outpoints[i].vout).unwrap()].value).await else {
log::error!("Value channel closed unexpectedly");
return;
};
}
}
})
});

Ok((outpoint_sender, value_receiver))
}

fn index_block(
&mut self,
index: &Index,
outpoint_sender: &mut Sender<OutPoint>,
value_receiver: &mut Receiver<u64>,
wtx: &mut WriteTransaction,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
// If value_receiver still has values something went wrong with the last block
// Could be an assert, shouldn't recover from this and commit the last block
let Err(TryRecvError::Empty) = value_receiver.try_recv() else {
return Err(anyhow!("Previous block did not consume all input values"));
};

let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

if !self.index_sats {
// Send all missing input outpoints to be fetched right away
let txids = block
.txdata
.iter()
.map(|(_, txid)| txid)
.collect::<HashSet<_>>();
for (tx, _) in &block.txdata {
for input in &tx.input {
let prev_output = input.previous_output;
// We don't need coinbase input value
if prev_output.is_null() {
continue;
}
// We don't need input values from txs earlier in the block, since they'll be added to value_cache
// when the tx is indexed
if txids.contains(&prev_output.txid) {
continue;
}
// We don't need input values we already have in our value_cache from earlier blocks
if value_cache.contains_key(&prev_output) {
continue;
}
// We don't need input values we already have in our outpoint_to_value table from earlier blocks that
// were committed to db already
if outpoint_to_value.get(&prev_output.store())?.is_some() {
continue;
}
// We don't know the value of this tx input. Send this outpoint to background thread to be fetched
outpoint_sender.blocking_send(prev_output)?;
}
}
}

let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;

let start = Instant::now();
Expand Down Expand Up @@ -279,7 +403,6 @@ impl Updater {
let mut inscription_id_to_satpoint = wtx.open_table(INSCRIPTION_ID_TO_SATPOINT)?;
let mut inscription_number_to_inscription_id =
wtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;
let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;
let mut sat_to_inscription_id = wtx.open_table(SAT_TO_INSCRIPTION_ID)?;
let mut satpoint_to_inscription_id = wtx.open_table(SATPOINT_TO_INSCRIPTION_ID)?;
let mut statistic_to_count = wtx.open_table(STATISTIC_TO_COUNT)?;
Expand All @@ -292,7 +415,7 @@ impl Updater {
let mut inscription_updater = InscriptionUpdater::new(
self.height,
&mut inscription_id_to_satpoint,
index,
value_receiver,
&mut inscription_id_to_inscription_entry,
lost_sats,
&mut inscription_number_to_inscription_id,
Expand Down
Loading

1 comment on commit 9f46f08

@GaloisField2718
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, is contains your batch for long tx-index ?

Please sign in to comment.