Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: augment predicate status returned by GET/LIST endpoints #397

Merged
merged 52 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
aa30d42
spelling fix in tests
MicaiahReid Aug 23, 2023
96fb8dd
variable rename
MicaiahReid Aug 23, 2023
4457533
update status for first scan
MicaiahReid Aug 23, 2023
0f8ed57
update StreamingData status
MicaiahReid Aug 23, 2023
aea9e15
missed rename
MicaiahReid Aug 23, 2023
1832ec8
add more info to predicate status updates
MicaiahReid Aug 24, 2023
9eec034
doc comments
MicaiahReid Aug 24, 2023
73a9c0f
remove changes to Interrupted status
MicaiahReid Aug 24, 2023
9741167
return error details to user for failed hook actions
MicaiahReid Aug 24, 2023
3208481
add predicate streaming status update for bitcoin
MicaiahReid Aug 24, 2023
78592ba
augment LIST endpoint; merge helpers
MicaiahReid Aug 24, 2023
7742043
variable/status renames
MicaiahReid Aug 28, 2023
dc2cbd9
check end_block when evaluating blocks
MicaiahReid Aug 28, 2023
dc63688
set expired status
MicaiahReid Aug 29, 2023
8b5fa75
missed updates in setting expired status
MicaiahReid Aug 29, 2023
3166833
fix blockheight after scan
MicaiahReid Aug 29, 2023
3b47b33
simplify fix for blockheight after scan
MicaiahReid Aug 29, 2023
4fd2ebb
fix expiration tracking
MicaiahReid Aug 30, 2023
ba775a5
revert stuff moved to other PR
MicaiahReid Aug 30, 2023
6295b86
allow expired => streaming/scanning status update
MicaiahReid Aug 30, 2023
6530b6a
remove unused var
MicaiahReid Aug 30, 2023
726024e
add doc images
MicaiahReid Aug 30, 2023
e68b095
link to doc image
MicaiahReid Aug 30, 2023
eb08eda
expire blocks once confirmed by chain
MicaiahReid Sep 6, 2023
636f510
create db on readonly connect if not exist
MicaiahReid Sep 11, 2023
8ab1dd0
add some serde derivations
MicaiahReid Sep 11, 2023
53080f7
only enable predicate if not expired
MicaiahReid Sep 11, 2023
c7b4d7a
unimplemented => noop
MicaiahReid Sep 11, 2023
01ca09b
add tests for chainhook service get status api
MicaiahReid Sep 11, 2023
55298f3
fix: handle service restarts (#400)
MicaiahReid Sep 11, 2023
ead4854
clean up some tests
MicaiahReid Sep 12, 2023
994d84c
fix some imports
MicaiahReid Sep 12, 2023
8038227
ignore tests that won't work in ci
MicaiahReid Sep 12, 2023
aa39c96
set expired status for bitcoin after scan
MicaiahReid Sep 13, 2023
1b17af6
remove redundant variable
MicaiahReid Sep 13, 2023
9c42189
rename expiration statuses
MicaiahReid Sep 13, 2023
f6970c7
make expired field expired_at
MicaiahReid Sep 13, 2023
7d7121c
update doc images
MicaiahReid Sep 13, 2023
953eb79
fix boolean
MicaiahReid Sep 13, 2023
a66236f
add logs to tests
MicaiahReid Sep 13, 2023
862e6cb
enable backtrace on ci
MicaiahReid Sep 13, 2023
cb37e3a
Merge branch 'develop' into augment-get-chainhooks
MicaiahReid Sep 13, 2023
bc8ca69
test panic on acquire lock
MicaiahReid Sep 13, 2023
0dba37d
undo panic
MicaiahReid Sep 13, 2023
a7cd210
run ci tests in single thread
MicaiahReid Sep 13, 2023
e28901f
remove `cargo check` from CI
MicaiahReid Sep 13, 2023
af45ce0
panic on failed registration
MicaiahReid Sep 13, 2023
f4ec749
fix: update metrics before sending event_observer_tx
MicaiahReid Sep 13, 2023
2b74dfb
fix: deregister bitcoin predicate instead of stacks
MicaiahReid Sep 14, 2023
0c82f66
fix dockerfile path
MicaiahReid Sep 14, 2023
e9f17de
update rust toolchain
MicaiahReid Sep 14, 2023
62d8dec
set rust version for dockerfile
MicaiahReid Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
run: |
rustup update
cargo check
cargo test --all
RUST_BACKTRACE=1 cargo test --all

- name: Semantic Release
uses: cycjimmy/semantic-release-action@v3
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ components/chainhook-types-js/dist
*.rdb
*.redb
cache/

components/chainhook-cli/src/service/tests/fixtures/tmp
9 changes: 3 additions & 6 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::generator::generate_config;
use crate::config::{Config, PredicatesApi};
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate,
Expand Down Expand Up @@ -291,12 +291,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
match opts.command {
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let mut config =
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
// We disable the API if a predicate was passed, and the --enable-
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.http_api = PredicatesApi::Off;
}

let predicates = cmd
.predicates_paths
Expand Down Expand Up @@ -462,6 +458,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {

scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
None,
&config,
&ctx,
)
Expand Down
165 changes: 113 additions & 52 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
ScanningData,
open_readwrite_predicates_db_conn_or_panic, set_predicate_scanning_status,
set_unconfirmed_expiration_status, ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
Expand All @@ -16,16 +16,17 @@ use chainhook_sdk::indexer::bitcoin::{
};
use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BlockIdentifier, Chain,
};
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, Context};
use std::collections::HashMap;

pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicate_spec: &BitcoinChainhookSpecification,
unfinished_scan_data: Option<ScanningData>,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
) -> Result<bool, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
Expand All @@ -43,43 +44,70 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
Some(start_block) => match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => start_block,
},
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
};
let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
let (end_block, update_end_block) = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
(result.blocks, true)
} else {
(end_block, false)
}
}
None => (result.blocks, true),
},
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
floating_end_block = update_end_block;
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
};

let mut predicates_db_conn = match config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
}
PredicatesApi::Off => None,
};

info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
);

let mut last_block_scanned = BlockIdentifier::default();
let mut actions_triggered = 0;
let mut err_count = 0;

let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_blocks_sent = 0u64;

let (mut number_of_blocks_to_scan, mut number_of_blocks_scanned, mut number_of_times_triggered) = {
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
match &unfinished_scan_data {
Some(scan_data) => (
scan_data.number_of_blocks_to_scan,
scan_data.number_of_blocks_evaluated,
scan_data.number_of_times_triggered,
),
None => (number_of_blocks_to_scan, 0, 0u64),
}
};

let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
Expand Down Expand Up @@ -109,8 +137,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
continue;
}
};
last_block_scanned = block.block_identifier.clone();

match process_block_with_predicates(
let res = match process_block_with_predicates(
block,
&vec![&predicate_spec],
&event_observer_config,
Expand All @@ -120,81 +149,113 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
{
Ok(actions) => {
if actions > 0 {
number_of_blocks_sent += 1;
number_of_times_triggered += 1;
}
actions_triggered += actions
actions_triggered += actions;
Ok(())
}
Err(_) => err_count += 1,
}
Err(e) => {
err_count += 1;
Err(e)
}
};

if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
if res.is_err() {
return Err(format!(
"Scan aborted (consecutive action errors >= 3): {}",
res.unwrap_err()
));
} else {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
}

if let PredicatesApi::On(ref api_config) = config.http_api {
if number_of_blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if number_of_blocks_scanned % 10 == 0 || number_of_blocks_scanned == 1 {
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
number_of_times_triggered,
current_block_height,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&ctx,
)
predicates_db_conn,
ctx,
);
}
}

if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..result.blocks {
block_heights_to_scan.push_back(entry);
let new_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
result.blocks
} else {
end_block
}
}
}
None => result.blocks,
},
Err(_e) => {
continue;
}
};

for entry in (current_block_height + 1)..new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}
info!(
ctx.expect_logger(),
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if let Some(predicate_end_block) = predicate_spec.end_block {
if predicate_end_block == last_block_scanned.index {
// todo: we need to find a way to check if this block is confirmed
// and if so, set the status to confirmed expiration
set_unconfirmed_expiration_status(
&Chain::Bitcoin,
number_of_blocks_scanned,
predicate_end_block,
&predicate_spec.key(),
predicates_db_conn,
ctx,
);
return Ok(true);
}
}
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height: 0,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
number_of_times_triggered,
last_block_scanned.index,
predicates_db_conn,
ctx,
);
}

Ok(())
return Ok(false);
}

pub async fn process_block_with_predicates(
block: BitcoinBlockData,
predicates: &Vec<&BitcoinChainhookSpecification>,
event_observer_config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});

let (predicates_triggered, _predicates_evaluated) =
let (predicates_triggered, _predicates_evaluated, _predicates_expired) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);

execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
Expand All @@ -204,7 +265,7 @@ pub async fn execute_predicates_action<'a>(
hits: Vec<BitcoinTriggerChainhook<'a>>,
config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let mut actions_triggered = 0;
let mut proofs = HashMap::new();
for trigger in hits.into_iter() {
Expand Down
Loading
Loading