Skip to content

Commit

Permalink
Incremental indexing (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
casey authored Feb 19, 2022
1 parent 6782b7c commit 29d1493
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 26 deletions.
71 changes: 64 additions & 7 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ pub(crate) struct Index {
}

impl Index {
const HEIGHT_TO_HASH: TableDefinition<'static, u64, [u8]> =
TableDefinition::new("HEIGHT_TO_HASH");
const OUTPOINT_TO_ORDINAL_RANGES: TableDefinition<'static, [u8], [u8]> =
TableDefinition::new("OUTPOINT_TO_ORDINAL_RANGES");

pub(crate) fn new(options: Options) -> Result<Self> {
pub(crate) fn open(options: Options) -> Result<Self> {
let client = Client::new(
&options.rpc_url.ok_or("This command requires `--rpc-url`")?,
options
Expand All @@ -32,17 +34,49 @@ impl Index {
Err(error) => return Err(error.into()),
};

let index = Self {
Ok(Self {
client,
database,
sleep_until: Cell::new(Instant::now()),
};
})
}

#[allow(clippy::self_named_constructors)]
pub(crate) fn index(options: Options) -> Result<Self> {
let index = Self::open(options)?;

index.index_ranges()?;

Ok(index)
}

pub(crate) fn print_info(&self) -> Result {
let tx = self.database.begin_write()?;

let height_to_hash = tx.open_table(&Self::HEIGHT_TO_HASH)?;

let blocks_indexed = height_to_hash
.range_reversed(0..)?
.next()
.map(|(height, _hash)| height + 1)
.unwrap_or(0);

let outputs_indexed = tx.open_table(&Self::OUTPOINT_TO_ORDINAL_RANGES)?.len()?;

tx.abort()?;

let stats = self.database.stats()?;

println!("blocks indexed: {}", blocks_indexed);
println!("outputs indexed: {}", outputs_indexed);
println!("tree height: {}", stats.tree_height());
println!("free pages: {}", stats.free_pages());
println!("stored: {}", Bytes(stats.stored_bytes()));
println!("overhead: {}", Bytes(stats.overhead_bytes()));
println!("fragmented: {}", Bytes(stats.fragmented_bytes()));
Ok(())
}

fn client(&self) -> &Client {
let now = Instant::now();

Expand All @@ -62,11 +96,34 @@ impl Index {
fn index_ranges(&self) -> Result {
log::info!("Indexing ranges…");

let mut height = 0;
while let Some(block) = self.block(height)? {
loop {
let wtx = self.database.begin_write()?;

let mut height_to_hash = wtx.open_table(&Self::HEIGHT_TO_HASH)?;
let height = height_to_hash
.range_reversed(0..)?
.next()
.map(|(height, _hash)| height + 1)
.unwrap_or(0);

log::info!("Indexing block at height {height}…");

let wtx = self.database.begin_write()?;
let block = match self.block(height)? {
Some(block) => block,
None => {
wtx.abort()?;
break;
}
};

if let Some(prev_height) = height.checked_sub(1) {
let prev_hash = height_to_hash.get(&prev_height)?.unwrap();

if prev_hash != block.header.prev_blockhash.as_ref() {
return Err("Reorg detected at or before {prev_height}".into());
}
}

let mut outpoint_to_ordinal_ranges = wtx.open_table(&Self::OUTPOINT_TO_ORDINAL_RANGES)?;

let mut coinbase_inputs = VecDeque::new();
Expand Down Expand Up @@ -172,8 +229,8 @@ impl Index {
}
}

height_to_hash.insert(&height, &block.block_hash())?;
wtx.commit()?;
height += 1;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/subcommand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Subcommand {
Self::Name(name) => name.run(),
Self::Range(range) => range.run(),
Self::Supply => supply::run(),
Self::Info => info::run(),
Self::Info => info::run(options),
Self::Traits(traits) => traits.run(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/subcommand/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) struct Find {

impl Find {
pub(crate) fn run(self, options: Options) -> Result<()> {
let index = Index::new(options)?;
let index = Index::open(options)?;

let creation_height = self.ordinal.height().n();
let block = index.block(creation_height)?.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/subcommand/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;

pub(crate) fn run(options: Options) -> Result<()> {
Index::new(options)?;
Index::index(options)?;
Ok(())
}
13 changes: 2 additions & 11 deletions src/subcommand/info.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use super::*;

pub(crate) fn run() -> Result {
let database = unsafe { Database::open("index.redb")? };

let stats = database.stats()?;

println!("tree height: {}", stats.tree_height());
println!("free pages: {}", stats.free_pages());
println!("stored: {}", Bytes(stats.stored_bytes()));
println!("overhead: {}", Bytes(stats.overhead_bytes()));
println!("fragmented: {}", Bytes(stats.fragmented_bytes()));

pub(crate) fn run(options: Options) -> Result {
Index::open(options)?.print_info()?;
Ok(())
}
2 changes: 1 addition & 1 deletion src/subcommand/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(crate) struct List {

impl List {
pub(crate) fn run(self, options: Options) -> Result<()> {
let index = Index::new(options)?;
let index = Index::index(options)?;
let ranges = index.list(self.outpoint)?;

for (start, end) in ranges {
Expand Down
22 changes: 22 additions & 0 deletions tests/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ fn default_index_size() -> Result {
Ok(())
}

#[test]
fn incremental_indexing() -> Result {
let output = Test::new()?
.command("list 0396bc915f141f7de025f72ae9b6bb8dcdb5f444fc245d8fac486ba67a38eef9:0")
.expected_stdout("[0,5000000000)\n")
.block()
.output()?;

assert_eq!(output.calls, &["getblockhash", "getblock", "getblockhash"]);

let output = Test::with_tempdir(output.tempdir)
.command("list 9068a11b8769174363376b606af9a4b8b29dd7b13d013f4b0cbbd457db3c3ce5:0")
.expected_stdout("[5000000000,10000000000)\n")
.block()
.block()
.output()?;

assert_eq!(output.calls, &["getblockhash", "getblock", "getblockhash"]);

Ok(())
}

#[test]
fn custom_index_size() -> Result {
let tempdir = Test::new()?
Expand Down
2 changes: 2 additions & 0 deletions tests/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ fn basic() -> Result {
.command("info")
.stdout_regex(
r"
blocks indexed: 1
outputs indexed: 1
tree height: \d+
free pages: \d+
stored: .* bytes
Expand Down
15 changes: 13 additions & 2 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ use {
},
executable_path::executable_path,
regex::Regex,
std::{collections::BTreeSet, error::Error, process::Command, str, thread},
std::{
collections::BTreeSet,
error::Error,
process::Command,
str,
sync::{Arc, Mutex},
thread,
},
tempfile::TempDir,
unindent::Unindent,
};
Expand All @@ -31,6 +38,7 @@ enum Expected {
}

struct Output {
calls: Vec<String>,
stdout: String,
tempdir: TempDir,
}
Expand Down Expand Up @@ -142,7 +150,7 @@ impl Test {
}

fn output(self) -> Result<Output> {
let (close_handle, port) = RpcServer::spawn(&self.blocks);
let (close_handle, calls, port) = RpcServer::spawn(&self.blocks);

let output = Command::new(executable_path("ord"))
.current_dir(&self.tempdir)
Expand Down Expand Up @@ -178,9 +186,12 @@ impl Test {
Expected::Ignore => {}
}

let calls = calls.lock().unwrap().clone();

Ok(Output {
stdout: stdout.to_string(),
tempdir: self.tempdir,
calls,
})
}

Expand Down
16 changes: 14 additions & 2 deletions tests/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ pub trait RpcApi {

pub struct RpcServer {
blocks: Vec<Block>,
calls: Arc<Mutex<Vec<String>>>,
}

impl RpcServer {
pub(crate) fn spawn(blocks: &[Block]) -> (CloseHandle, u16) {
pub(crate) fn spawn(blocks: &[Block]) -> (CloseHandle, Arc<Mutex<Vec<String>>>, u16) {
let calls = Arc::new(Mutex::new(Vec::new()));

let server = Self {
blocks: blocks.to_vec(),
calls: calls.clone(),
};
let mut io = IoHandler::default();
io.extend_with(server.to_delegate());
Expand All @@ -35,12 +39,18 @@ impl RpcServer {

thread::spawn(|| server.wait());

(close_handle, port)
(close_handle, calls, port)
}

fn call(&self, method: &str) {
self.calls.lock().unwrap().push(method.into());
}
}

impl RpcApi for RpcServer {
fn getblockhash(&self, height: usize) -> Result<BlockHash> {
self.call("getblockhash");

match self.blocks.get(height) {
Some(block) => Ok(block.block_hash()),
None => Err(jsonrpc_core::Error::new(
Expand All @@ -50,6 +60,8 @@ impl RpcApi for RpcServer {
}

fn getblock(&self, blockhash: BlockHash, verbosity: u64) -> Result<String> {
self.call("getblock");

assert_eq!(verbosity, 0, "Verbosity level {verbosity} is unsupported");

for block in &self.blocks {
Expand Down

0 comments on commit 29d1493

Please sign in to comment.