Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuously index ranges #198

Merged
merged 16 commits into from
Mar 29, 2022
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fmt:
cargo fmt

clippy:
cargo clippy
cargo clippy --all --all-targets

bench:
cargo criterion
Expand Down
26 changes: 3 additions & 23 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use {
pub(crate) struct Index {
client: Client,
database: Database,
sleep_until: Cell<Instant>,
}

impl Index {
Expand All @@ -28,7 +27,6 @@ impl Index {
Ok(Self {
client,
database: Database::open(options).context("Failed to open database")?,
sleep_until: Cell::new(Instant::now()),
})
}

Expand All @@ -45,24 +43,6 @@ impl Index {
self.database.print_info()
}

fn client(&self) -> &Client {
if cfg!(target_os = "macos") {
let now = Instant::now();

let sleep_until = self.sleep_until.get();

if sleep_until > now {
thread::sleep(sleep_until - now);
}

self
.sleep_until
.set(Instant::now() + Duration::from_millis(2));
}

&self.client
}

fn decode_ordinal_range(bytes: [u8; 11]) -> (u64, u64) {
let n = u128::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], bytes[8],
Expand All @@ -77,7 +57,7 @@ impl Index {
(base, base + delta)
}

fn index_ranges(&self) -> Result {
pub(crate) fn index_ranges(&self) -> Result {
log::info!("Indexing ranges…");

let mut wtx = self.database.begin_write()?;
Expand Down Expand Up @@ -241,8 +221,8 @@ impl Index {
}

pub(crate) fn block(&self, height: u64) -> Result<Option<Block>> {
match self.client().get_block_hash(height) {
Ok(hash) => Ok(Some(self.client().get_block(&hash)?)),
match self.client.get_block_hash(height) {
Ok(hash) => Ok(Some(self.client.get_block(&hash)?)),
Err(bitcoincore_rpc::Error::JsonRpc(jsonrpc::error::Error::Rpc(
jsonrpc::error::RpcError { code: -8, .. },
))) => Ok(None),
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use {
integer_sqrt::IntegerSquareRoot,
lazy_static::lazy_static,
std::{
cell::Cell,
cmp::Ordering,
collections::VecDeque,
env,
Expand Down
16 changes: 12 additions & 4 deletions src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@ pub(crate) struct Server {
impl Server {
pub(crate) fn run(self, options: Options) -> Result {
Runtime::new()?.block_on(async {
let index = Index::index(&options)?;
let index = Arc::new(Index::open(&options)?);

let clone = index.clone();
thread::spawn(move || loop {
if let Err(error) = clone.index_ranges() {
log::error!("{error}");
}
thread::sleep(Duration::from_millis(100));
});

let app = Router::new()
.route("/list/:outpoint", get(Self::list))
.route("/status", get(Self::status))
.layer(extract::Extension(Arc::new(Mutex::new(index))))
.layer(extract::Extension(index))
.layer(
CorsLayer::new()
.allow_methods([http::Method::GET])
Expand All @@ -43,9 +51,9 @@ impl Server {

async fn list(
extract::Path(outpoint): extract::Path<OutPoint>,
index: extract::Extension<Arc<Mutex<Index>>>,
index: extract::Extension<Arc<Index>>,
) -> impl IntoResponse {
match index.lock().unwrap().list(outpoint) {
match index.list(outpoint) {
Ok(Some(ranges)) => (StatusCode::OK, Json(Some(ranges))),
Ok(None) => (StatusCode::NOT_FOUND, Json(None)),
Err(error) => {
Expand Down
102 changes: 74 additions & 28 deletions tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::type_complexity)]

use {
crate::rpc_server::RpcServer,
bitcoin::{
Expand Down Expand Up @@ -43,6 +45,11 @@ enum Expected {
Ignore,
}

enum Event {
Block(Block),
Request(String, u16, String),
}

struct Output {
calls: Vec<String>,
stdout: String,
Expand Down Expand Up @@ -73,11 +80,10 @@ struct TransactionOptions<'a> {

struct Test {
args: Vec<String>,
blocks: Vec<Block>,
expected_status: i32,
expected_stderr: String,
expected_stderr: Option<String>,
expected_stdout: Expected,
requests: Vec<(String, String)>,
events: Vec<Event>,
tempdir: TempDir,
}

Expand All @@ -89,11 +95,10 @@ impl Test {
fn with_tempdir(tempdir: TempDir) -> Self {
Self {
args: Vec::new(),
blocks: Vec::new(),
expected_status: 0,
expected_stderr: String::new(),
expected_stderr: None,
expected_stdout: Expected::String(String::new()),
requests: Vec::new(),
events: Vec::new(),
tempdir,
}
}
Expand Down Expand Up @@ -134,7 +139,7 @@ impl Test {

fn expected_stderr(self, expected_stderr: &str) -> Self {
Self {
expected_stderr: expected_stderr.to_owned(),
expected_stderr: Some(expected_stderr.to_owned()),
..self
}
}
Expand All @@ -153,8 +158,12 @@ impl Test {
}
}

fn request(mut self, path: &str, response: &str) -> Self {
self.requests.push((path.to_string(), response.to_string()));
fn request(mut self, path: &str, status: u16, response: &str) -> Self {
self.events.push(Event::Request(
path.to_string(),
status,
response.to_string(),
));
self
}

Expand All @@ -170,19 +179,34 @@ impl Test {
self.test(Some(port)).map(|_| ())
}

fn blocks(&self) -> impl Iterator<Item = &Block> + '_ {
self.events.iter().filter_map(|event| match event {
Event::Block(block) => Some(block),
_ => None,
})
}

fn test(self, port: Option<u16>) -> Result<Output> {
for (b, block) in self.blocks.iter().enumerate() {
for (b, block) in self.blocks().enumerate() {
for (t, transaction) in block.txdata.iter().enumerate() {
eprintln!("{b}.{t}: {}", transaction.txid());
}
}

let (close_handle, calls, rpc_server_port) = RpcServer::spawn(&self.blocks);
let (blocks, close_handle, calls, rpc_server_port) = if port.is_some() {
RpcServer::spawn(Vec::new())
} else {
RpcServer::spawn(self.blocks().cloned().collect())
};

let child = Command::new(executable_path("ord"))
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stderr(if self.expected_stderr.is_some() {
Stdio::piped()
} else {
Stdio::inherit()
})
.current_dir(&self.tempdir)
.arg(format!("--rpc-url=http://127.0.0.1:{rpc_server_port}"))
.args(self.args)
Expand Down Expand Up @@ -215,13 +239,21 @@ impl Test {
}

if healthy {
for (request, expected_response) in &self.requests {
let response = client
.get(&format!("http://127.0.0.1:{port}/{request}"))
.send()?;
assert!(response.status().is_success(), "{:?}", response.status());
assert_eq!(response.text()?, *expected_response);
successful_requests += 1;
for event in &self.events {
match event {
Event::Block(block) => {
blocks.lock().unwrap().push(block.clone());
thread::sleep(Duration::from_millis(200));
}
Event::Request(request, status, expected_response) => {
let response = client
.get(&format!("http://127.0.0.1:{port}/{request}"))
.send()?;
assert_eq!(response.status().as_u16(), *status);
assert_eq!(response.text()?, *expected_response);
successful_requests += 1;
}
}
}
}

Expand All @@ -248,7 +280,9 @@ impl Test {
print!("{}", m.as_str())
}

assert_eq!(re.replace_all(stderr, ""), self.expected_stderr);
if let Some(expected_stderr) = self.expected_stderr {
assert_eq!(re.replace_all(stderr, ""), expected_stderr);
}

match self.expected_stdout {
Expected::String(expected_stdout) => assert_eq!(stdout, expected_stdout),
Expand All @@ -262,7 +296,11 @@ impl Test {

assert_eq!(
successful_requests,
self.requests.len(),
self
.events
.iter()
.filter(|event| matches!(event, Event::Request(..)))
.count(),
"Unsuccessful requests"
);

Expand All @@ -280,11 +318,11 @@ impl Test {
}

fn block_with_coinbase(mut self, coinbase: CoinbaseOptions) -> Self {
self.blocks.push(Block {
self.events.push(Event::Block(Block {
header: BlockHeader {
version: 0,
prev_blockhash: self
.blocks
.blocks()
.last()
.map(Block::block_hash)
.unwrap_or_default(),
Expand All @@ -301,7 +339,7 @@ impl Test {
previous_output: OutPoint::null(),
script_sig: if coinbase.include_height {
script::Builder::new()
.push_scriptint(self.blocks.len().try_into().unwrap())
.push_scriptint(self.blocks().count().try_into().unwrap())
.into_script()
} else {
script::Builder::new().into_script()
Expand All @@ -317,15 +355,15 @@ impl Test {
} else {
Vec::new()
},
});
}));
self
}

fn transaction(mut self, options: TransactionOptions) -> Self {
let input_value = options
.slots
.iter()
.map(|slot| self.blocks[slot.0].txdata[slot.1].output[slot.2].value)
.map(|slot| self.blocks().nth(slot.0).unwrap().txdata[slot.1].output[slot.2].value)
.sum::<u64>();

let output_value = input_value - options.fee;
Expand All @@ -338,7 +376,7 @@ impl Test {
.iter()
.map(|slot| TxIn {
previous_output: OutPoint {
txid: self.blocks[slot.0].txdata[slot.1].txid(),
txid: self.blocks().nth(slot.0).unwrap().txdata[slot.1].txid(),
vout: slot.2 as u32,
},
script_sig: script::Builder::new().into_script(),
Expand All @@ -355,7 +393,15 @@ impl Test {
],
};

let block = self.blocks.last_mut().unwrap();
let block = self
.events
.iter_mut()
.rev()
.find_map(|event| match event {
Event::Block(block) => Some(block),
_ => None,
})
.unwrap();

block
.txdata
Expand Down
22 changes: 16 additions & 6 deletions tests/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,28 @@ pub trait RpcApi {
}

pub struct RpcServer {
blocks: Vec<Block>,
blocks: Arc<Mutex<Vec<Block>>>,
calls: Arc<Mutex<Vec<String>>>,
}

impl RpcServer {
pub(crate) fn spawn(blocks: &[Block]) -> (CloseHandle, Arc<Mutex<Vec<String>>>, u16) {
pub(crate) fn spawn(
blocks: Vec<Block>,
) -> (
Arc<Mutex<Vec<Block>>>,
CloseHandle,
Arc<Mutex<Vec<String>>>,
u16,
) {
let calls = Arc::new(Mutex::new(Vec::new()));

let blocks = Arc::new(Mutex::new(blocks));

let server = Self {
blocks: blocks.to_vec(),
blocks: blocks.clone(),
calls: calls.clone(),
};

let mut io = IoHandler::default();
io.extend_with(server.to_delegate());

Expand All @@ -39,7 +49,7 @@ impl RpcServer {

thread::spawn(|| server.wait());

(close_handle, calls, port)
(blocks, close_handle, calls, port)
}

fn call(&self, method: &str) {
Expand All @@ -51,7 +61,7 @@ impl RpcApi for RpcServer {
fn getblockhash(&self, height: usize) -> Result<BlockHash> {
self.call("getblockhash");

match self.blocks.get(height) {
match self.blocks.lock().unwrap().get(height) {
Some(block) => Ok(block.block_hash()),
None => Err(jsonrpc_core::Error::new(
jsonrpc_core::types::error::ErrorCode::ServerError(-8),
Expand All @@ -64,7 +74,7 @@ impl RpcApi for RpcServer {

assert_eq!(verbosity, 0, "Verbosity level {verbosity} is unsupported");

for block in &self.blocks {
for block in self.blocks.lock().unwrap().iter() {
if block.block_hash() == blockhash {
let mut encoded = Vec::new();
block.consensus_encode(&mut encoded).unwrap();
Expand Down
Loading