Skip to content

Commit

Permalink
add predicate streaming status update for bitcoin
Browse files Browse the repository at this point in the history
  • Loading branch information
MicaiahReid committed Aug 24, 2023
1 parent 182dd7a commit 2c458a6
Showing 1 changed file with 38 additions and 21 deletions.
59 changes: 38 additions & 21 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::storage::{
use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};

use chainhook_sdk::chainhooks::types::ChainhookSpecification;
use chainhook_sdk::observer::{start_event_observer, ObserverEvent};
use chainhook_sdk::observer::{start_event_observer, ObserverEvent, PredicateEvaluationReport};
use chainhook_sdk::types::StacksChainEvent;
use chainhook_sdk::utils::Context;
use redis::{Commands, Connection};
Expand Down Expand Up @@ -274,8 +274,11 @@ impl Service {
}
}
}
ObserverEvent::BitcoinChainEvent((_chain_update, _report)) => {
ObserverEvent::BitcoinChainEvent((_chain_update, report)) => {
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
update_streaming_status_from_report(report, predicates_db_conn, &ctx);
}
}
ObserverEvent::StacksChainEvent((chain_event, report)) => {
let stacks_db_conn_rw = match open_readwrite_stacks_db_conn(
Expand Down Expand Up @@ -322,23 +325,7 @@ impl Service {
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
};
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
for (predicate_uuid, _blocks_ids) in report.predicates_evaluated.iter() {
set_predicate_streaming_status(
StreamingDataType::Evaluation,
&(ChainhookSpecification::stacks_key(predicate_uuid)),
predicates_db_conn,
&ctx,
);
}

for (predicate_uuid, _blocks_ids) in report.predicates_triggered.iter() {
set_predicate_streaming_status(
StreamingDataType::Occurrence,
&(ChainhookSpecification::stacks_key(predicate_uuid)),
predicates_db_conn,
&ctx,
);
}
update_streaming_status_from_report(report, predicates_db_conn, &ctx);
}
// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
if stacks_event > 32 {
Expand Down Expand Up @@ -395,6 +382,33 @@ pub struct CompletedScanData {
pub final_block_height_scanned: u64,
}

fn update_streaming_status_from_report(
report: PredicateEvaluationReport,
predicates_db_conn: &mut Connection,
ctx: &Context,
) {
for (predicate_uuid, _blocks_ids) in report.predicates_triggered.iter() {
set_predicate_streaming_status(
StreamingDataType::Occurrence,
&(ChainhookSpecification::either_stx_or_btc_key(predicate_uuid)),
predicates_db_conn,
&ctx,
);
}

for (predicate_uuid, _blocks_ids) in report.predicates_evaluated.iter() {
if report.predicates_triggered.contains_key(predicate_uuid) {
continue;
}
set_predicate_streaming_status(
StreamingDataType::Evaluation,
&(ChainhookSpecification::either_stx_or_btc_key(predicate_uuid)),
predicates_db_conn,
&ctx,
);
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StreamingDataType {
Occurrence,
Expand All @@ -416,7 +430,7 @@ fn set_predicate_streaming_status(
.duration_since(UNIX_EPOCH)
.expect("Could not get current time in ms")
.as_millis();
let (mut last_occurrence, number_of_times_triggered) = {
let (mut last_occurrence, mut number_of_times_triggered) = {
let current_status = retrieve_predicate_status(&predicate_key, predicates_db_conn);
match current_status {
Some(status) => match status {
Expand All @@ -438,7 +452,10 @@ fn set_predicate_streaming_status(
}
};
match streaming_data_type {
StreamingDataType::Occurrence => last_occurrence = now_ms,
StreamingDataType::Occurrence => {
last_occurrence = now_ms;
number_of_times_triggered += 1;
}
_ => {}
}

Expand Down

0 comments on commit 2c458a6

Please sign in to comment.