Skip to content

Commit

Permalink
fix: handle service restarts (#400)
Browse files Browse the repository at this point in the history
### Description

Improves how we handle a restart of `chainhook service` while predicates
are scanning/streaming. Here are the cases we now handle:
1. Predicates that were in `scanning` status when Chainhook was
terminated will resume scanning starting from their
`last_evaluated_block_height`. *Note: because we only save predicate
status every 10 scans, we could end up re-emiting matches on a resetart*
2. Predicates that were in `new` status when Chainhook was terminated
will start scanning at the predicate's `start_block`
3. Predicates that were in `streaming` status will _return_ to a
`scanning` status, starting at `last_evaluated_block_height` to catch up
on the missed blocks. Note, the `number_of_blocks_to_scan` is set to 0
for this temporary catch-up, as it's difficult to compute the number of
remaining blocks in the context of this change
4. If predicates were passed in at startup, we also register those to
begin scanning, which previously didn't happen
5. We now allow passing in a predicate at startup _and_ registering
additional predicates with the predicate registration server. This means
that if you use the same startup predicate repeatedly, it will already
be saved in redis and _not_ be reloaded.

Fixes: #298, fixes #390, fixes #402, fixes #403


---

### Checklist

- [x] All tests pass
- [ ] Tests added in this PR (if applicable)
  • Loading branch information
MicaiahReid authored Sep 11, 2023
1 parent 01ca09b commit 55298f3
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 51 deletions.
9 changes: 3 additions & 6 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::generator::generate_config;
use crate::config::{Config, PredicatesApi};
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate,
Expand Down Expand Up @@ -291,12 +291,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
match opts.command {
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let mut config =
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
// We disable the API if a predicate was passed, and the --enable-
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.http_api = PredicatesApi::Off;
}

let predicates = cmd
.predicates_paths
Expand Down Expand Up @@ -462,6 +458,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {

scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
None,
&config,
&ctx,
)
Expand Down
72 changes: 52 additions & 20 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_scanning_status};
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, set_predicate_scanning_status, ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
use chainhook_sdk::chainhooks::bitcoin::{
Expand All @@ -20,6 +22,7 @@ use std::collections::HashMap;

pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicate_spec: &BitcoinChainhookSpecification,
unfinished_scan_data: Option<ScanningData>,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
Expand All @@ -40,25 +43,34 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
Some(start_block) => match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => start_block,
},
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
};
let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
let (end_block, update_end_block) = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
(result.blocks, true)
} else {
(end_block, false)
}
}
None => (result.blocks, true),
},
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
floating_end_block = update_end_block;
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
Expand All @@ -82,9 +94,19 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_times_triggered = 0u64;

let (mut number_of_blocks_to_scan, mut number_of_blocks_scanned, mut number_of_times_triggered) = {
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
match &unfinished_scan_data {
Some(scan_data) => (
scan_data.number_of_blocks_to_scan,
scan_data.number_of_blocks_evaluated,
scan_data.number_of_times_triggered,
),
None => (number_of_blocks_to_scan, 0, 0u64),
}
};

let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
Expand Down Expand Up @@ -149,7 +171,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}

if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if number_of_blocks_scanned % 50 == 0 || number_of_blocks_scanned == 1 {
if number_of_blocks_scanned % 10 == 0 || number_of_blocks_scanned == 1 {
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
Expand All @@ -163,16 +185,26 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}

if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..result.blocks {
block_heights_to_scan.push_back(entry);
let new_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
result.blocks
} else {
end_block
}
}
}
None => result.blocks,
},
Err(_e) => {
continue;
}
};

for entry in (current_block_height + 1)..new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}
info!(
Expand Down
74 changes: 66 additions & 8 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
config::{Config, PredicatesApi},
service::{
open_readwrite_predicates_db_conn_or_panic, set_expired_safe_status,
set_expired_unsafe_status, set_predicate_scanning_status, Chain,
set_expired_unsafe_status, set_predicate_scanning_status, Chain, ScanningData,
},
storage::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
Expand Down Expand Up @@ -145,6 +145,7 @@ pub async fn get_canonical_fork_from_tsv(

pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
predicate_spec: &StacksChainhookSpecification,
unfinished_scan_data: Option<ScanningData>,
stacks_db_conn: &DB,
config: &Config,
ctx: &Context,
Expand All @@ -155,7 +156,10 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
Some(start_block) => match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => start_block,
},
None => {
return Err(
"Chainhook specification must include fields 'start_block' when using the scan command"
Expand All @@ -165,7 +169,31 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
};

let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
Some(end_block) => {
// if the user provided an end block that is above the chain tip, we'll
// only scan up to the chain tip, then go to streaming mode
match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
(chain_tip, true)
} else {
(end_block, false)
}
}
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
(chain_tip, true)
} else {
(end_block, false)
}
}
None => {
return Err("Chainhook specification must include fields 'end_block' when using the scan command".into());
}
},
}
}
None => match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(end_block) => (end_block, true),
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Expand Down Expand Up @@ -199,9 +227,18 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
);
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_times_triggered = 0u64;

let (mut number_of_blocks_to_scan, mut number_of_blocks_scanned, mut number_of_times_triggered) = {
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
match &unfinished_scan_data {
Some(scan_data) => (
scan_data.number_of_blocks_to_scan,
scan_data.number_of_blocks_evaluated,
scan_data.number_of_times_triggered,
),
None => (number_of_blocks_to_scan, 0, 0u64),
}
};

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
number_of_blocks_scanned += 1; // todo: can we remove this and just use `blocks_scanned`?
Expand Down Expand Up @@ -285,7 +322,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
}

if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if blocks_scanned % 5000 == 0 || blocks_scanned == 1 {
if blocks_scanned % 10 == 0 || blocks_scanned == 1 {
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
Expand All @@ -301,7 +338,27 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
// Update end_block, in case a new block was discovered during the scan
if block_heights_to_scan.is_empty() && floating_end_block {
let new_tip = match predicate_spec.end_block {
Some(end_block) => end_block,
Some(end_block) => {
match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
chain_tip
} else {
end_block
}
}
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Some(chain_tip) => {
if end_block > chain_tip {
chain_tip
} else {
end_block
}
}
None => current_block_height,
},
}
}
None => match get_last_unconfirmed_block_height_inserted(stacks_db_conn, ctx) {
Some(end_block) => end_block,
None => match get_last_block_height_inserted(stacks_db_conn, ctx) {
Expand All @@ -313,6 +370,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
for entry in (current_block_height + 1)..=new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}
info!(
Expand Down
Loading

0 comments on commit 55298f3

Please sign in to comment.