From 29d149323dd29546698a916deb5ae00bc99b8319 Mon Sep 17 00:00:00 2001 From: Casey Rodarmor Date: Fri, 18 Feb 2022 23:58:36 -0800 Subject: [PATCH] Incremental indexing (#141) --- src/index.rs | 71 +++++++++++++++++++++++++++++++++++++---- src/subcommand.rs | 2 +- src/subcommand/find.rs | 2 +- src/subcommand/index.rs | 2 +- src/subcommand/info.rs | 13 ++------ src/subcommand/list.rs | 2 +- tests/index.rs | 22 +++++++++++++ tests/info.rs | 2 ++ tests/integration.rs | 15 +++++++-- tests/rpc_server.rs | 16 ++++++++-- 10 files changed, 121 insertions(+), 26 deletions(-) diff --git a/src/index.rs b/src/index.rs index 8694c7f260..d92bb5e813 100644 --- a/src/index.rs +++ b/src/index.rs @@ -10,10 +10,12 @@ pub(crate) struct Index { } impl Index { + const HEIGHT_TO_HASH: TableDefinition<'static, u64, [u8]> = + TableDefinition::new("HEIGHT_TO_HASH"); const OUTPOINT_TO_ORDINAL_RANGES: TableDefinition<'static, [u8], [u8]> = TableDefinition::new("OUTPOINT_TO_ORDINAL_RANGES"); - pub(crate) fn new(options: Options) -> Result { + pub(crate) fn open(options: Options) -> Result { let client = Client::new( &options.rpc_url.ok_or("This command requires `--rpc-url`")?, options @@ -32,17 +34,49 @@ impl Index { Err(error) => return Err(error.into()), }; - let index = Self { + Ok(Self { client, database, sleep_until: Cell::new(Instant::now()), - }; + }) + } + + #[allow(clippy::self_named_constructors)] + pub(crate) fn index(options: Options) -> Result { + let index = Self::open(options)?; index.index_ranges()?; Ok(index) } + pub(crate) fn print_info(&self) -> Result { + let tx = self.database.begin_write()?; + + let height_to_hash = tx.open_table(&Self::HEIGHT_TO_HASH)?; + + let blocks_indexed = height_to_hash + .range_reversed(0..)? + .next() + .map(|(height, _hash)| height + 1) + .unwrap_or(0); + + let outputs_indexed = tx.open_table(&Self::OUTPOINT_TO_ORDINAL_RANGES)?.len()?; + + tx.abort()?; + + let stats = self.database.stats()?; + + println!("blocks indexed: {}", blocks_indexed); + println!("outputs indexed: {}", outputs_indexed); + println!("tree height: {}", stats.tree_height()); + println!("free pages: {}", stats.free_pages()); + println!("stored: {}", Bytes(stats.stored_bytes())); + println!("overhead: {}", Bytes(stats.overhead_bytes())); + println!("fragmented: {}", Bytes(stats.fragmented_bytes())); + Ok(()) + } + fn client(&self) -> &Client { let now = Instant::now(); @@ -62,11 +96,34 @@ impl Index { fn index_ranges(&self) -> Result { log::info!("Indexing ranges…"); - let mut height = 0; - while let Some(block) = self.block(height)? { + loop { + let wtx = self.database.begin_write()?; + + let mut height_to_hash = wtx.open_table(&Self::HEIGHT_TO_HASH)?; + let height = height_to_hash + .range_reversed(0..)? + .next() + .map(|(height, _hash)| height + 1) + .unwrap_or(0); + log::info!("Indexing block at height {height}…"); - let wtx = self.database.begin_write()?; + let block = match self.block(height)? { + Some(block) => block, + None => { + wtx.abort()?; + break; + } + }; + + if let Some(prev_height) = height.checked_sub(1) { + let prev_hash = height_to_hash.get(&prev_height)?.unwrap(); + + if prev_hash != block.header.prev_blockhash.as_ref() { + return Err("Reorg detected at or before {prev_height}".into()); + } + } + let mut outpoint_to_ordinal_ranges = wtx.open_table(&Self::OUTPOINT_TO_ORDINAL_RANGES)?; let mut coinbase_inputs = VecDeque::new(); @@ -172,8 +229,8 @@ impl Index { } } + height_to_hash.insert(&height, &block.block_hash())?; wtx.commit()?; - height += 1; } Ok(()) diff --git a/src/subcommand.rs b/src/subcommand.rs index 5e33818f8b..6ad3e4346a 100644 --- a/src/subcommand.rs +++ b/src/subcommand.rs @@ -33,7 +33,7 @@ impl Subcommand { Self::Name(name) => name.run(), Self::Range(range) => range.run(), Self::Supply => supply::run(), - Self::Info => info::run(), + Self::Info => info::run(options), Self::Traits(traits) => traits.run(), } } diff --git a/src/subcommand/find.rs b/src/subcommand/find.rs index 1bfbbbd0cf..ce0461a003 100644 --- a/src/subcommand/find.rs +++ b/src/subcommand/find.rs @@ -11,7 +11,7 @@ pub(crate) struct Find { impl Find { pub(crate) fn run(self, options: Options) -> Result<()> { - let index = Index::new(options)?; + let index = Index::open(options)?; let creation_height = self.ordinal.height().n(); let block = index.block(creation_height)?.unwrap(); diff --git a/src/subcommand/index.rs b/src/subcommand/index.rs index 72e5d56d3e..56f4139958 100644 --- a/src/subcommand/index.rs +++ b/src/subcommand/index.rs @@ -1,6 +1,6 @@ use super::*; pub(crate) fn run(options: Options) -> Result<()> { - Index::new(options)?; + Index::index(options)?; Ok(()) } diff --git a/src/subcommand/info.rs b/src/subcommand/info.rs index e122d0e075..e62ce5a676 100644 --- a/src/subcommand/info.rs +++ b/src/subcommand/info.rs @@ -1,15 +1,6 @@ use super::*; -pub(crate) fn run() -> Result { - let database = unsafe { Database::open("index.redb")? }; - - let stats = database.stats()?; - - println!("tree height: {}", stats.tree_height()); - println!("free pages: {}", stats.free_pages()); - println!("stored: {}", Bytes(stats.stored_bytes())); - println!("overhead: {}", Bytes(stats.overhead_bytes())); - println!("fragmented: {}", Bytes(stats.fragmented_bytes())); - +pub(crate) fn run(options: Options) -> Result { + Index::open(options)?.print_info()?; Ok(()) } diff --git a/src/subcommand/list.rs b/src/subcommand/list.rs index 11ab995641..30e4c222b4 100644 --- a/src/subcommand/list.rs +++ b/src/subcommand/list.rs @@ -7,7 +7,7 @@ pub(crate) struct List { impl List { pub(crate) fn run(self, options: Options) -> Result<()> { - let index = Index::new(options)?; + let index = Index::index(options)?; let ranges = index.list(self.outpoint)?; for (start, end) in ranges { diff --git a/tests/index.rs b/tests/index.rs index 203fc4ddf9..d8fc7a3b70 100644 --- a/tests/index.rs +++ b/tests/index.rs @@ -14,6 +14,28 @@ fn default_index_size() -> Result { Ok(()) } +#[test] +fn incremental_indexing() -> Result { + let output = Test::new()? + .command("list 0396bc915f141f7de025f72ae9b6bb8dcdb5f444fc245d8fac486ba67a38eef9:0") + .expected_stdout("[0,5000000000)\n") + .block() + .output()?; + + assert_eq!(output.calls, &["getblockhash", "getblock", "getblockhash"]); + + let output = Test::with_tempdir(output.tempdir) + .command("list 9068a11b8769174363376b606af9a4b8b29dd7b13d013f4b0cbbd457db3c3ce5:0") + .expected_stdout("[5000000000,10000000000)\n") + .block() + .block() + .output()?; + + assert_eq!(output.calls, &["getblockhash", "getblock", "getblockhash"]); + + Ok(()) +} + #[test] fn custom_index_size() -> Result { let tempdir = Test::new()? diff --git a/tests/info.rs b/tests/info.rs index e96f0934ad..7199149e8d 100644 --- a/tests/info.rs +++ b/tests/info.rs @@ -8,6 +8,8 @@ fn basic() -> Result { .command("info") .stdout_regex( r" + blocks indexed: 1 + outputs indexed: 1 tree height: \d+ free pages: \d+ stored: .* bytes diff --git a/tests/integration.rs b/tests/integration.rs index 302facee86..9663d2176e 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -6,7 +6,14 @@ use { }, executable_path::executable_path, regex::Regex, - std::{collections::BTreeSet, error::Error, process::Command, str, thread}, + std::{ + collections::BTreeSet, + error::Error, + process::Command, + str, + sync::{Arc, Mutex}, + thread, + }, tempfile::TempDir, unindent::Unindent, }; @@ -31,6 +38,7 @@ enum Expected { } struct Output { + calls: Vec, stdout: String, tempdir: TempDir, } @@ -142,7 +150,7 @@ impl Test { } fn output(self) -> Result { - let (close_handle, port) = RpcServer::spawn(&self.blocks); + let (close_handle, calls, port) = RpcServer::spawn(&self.blocks); let output = Command::new(executable_path("ord")) .current_dir(&self.tempdir) @@ -178,9 +186,12 @@ impl Test { Expected::Ignore => {} } + let calls = calls.lock().unwrap().clone(); + Ok(Output { stdout: stdout.to_string(), tempdir: self.tempdir, + calls, }) } diff --git a/tests/rpc_server.rs b/tests/rpc_server.rs index 2811d1b17b..5aa6ee17ed 100644 --- a/tests/rpc_server.rs +++ b/tests/rpc_server.rs @@ -14,12 +14,16 @@ pub trait RpcApi { pub struct RpcServer { blocks: Vec, + calls: Arc>>, } impl RpcServer { - pub(crate) fn spawn(blocks: &[Block]) -> (CloseHandle, u16) { + pub(crate) fn spawn(blocks: &[Block]) -> (CloseHandle, Arc>>, u16) { + let calls = Arc::new(Mutex::new(Vec::new())); + let server = Self { blocks: blocks.to_vec(), + calls: calls.clone(), }; let mut io = IoHandler::default(); io.extend_with(server.to_delegate()); @@ -35,12 +39,18 @@ impl RpcServer { thread::spawn(|| server.wait()); - (close_handle, port) + (close_handle, calls, port) + } + + fn call(&self, method: &str) { + self.calls.lock().unwrap().push(method.into()); } } impl RpcApi for RpcServer { fn getblockhash(&self, height: usize) -> Result { + self.call("getblockhash"); + match self.blocks.get(height) { Some(block) => Ok(block.block_hash()), None => Err(jsonrpc_core::Error::new( @@ -50,6 +60,8 @@ impl RpcApi for RpcServer { } fn getblock(&self, blockhash: BlockHash, verbosity: u64) -> Result { + self.call("getblock"); + assert_eq!(verbosity, 0, "Verbosity level {verbosity} is unsupported"); for block in &self.blocks {