Skip to content

Commit

Permalink
chore: Add more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Nov 24, 2023
1 parent a490c6d commit fb6f0cf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
30 changes: 18 additions & 12 deletions block-streamer/src/block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ pub(crate) async fn start_block_stream(
chain_id: &ChainId,
) -> anyhow::Result<()> {
tracing::info!(
"Starting block stream from {start_block_height} for indexer: {}",
"Starting block stream at {start_block_height} for indexer: {}",
indexer.get_full_name(),
);

let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client.clone());

// TODO move to DeltaLakeClient
let start_date = get_nearest_block_date(&s3_client, start_block_height, chain_id).await?;

let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?;
Expand All @@ -116,6 +117,12 @@ pub(crate) async fn start_block_stream(
affected_account_id,
..
} => {
tracing::debug!(
"Fetching block heights starting from {} from delta lake for indexer: {}",
start_date.date_naive(),
indexer.get_full_name()
);
// TODO Remove all block heights after start_block_height
delta_lake_client
.list_matching_block_heights(start_date, affected_account_id)
.await
Expand All @@ -128,7 +135,7 @@ pub(crate) async fn start_block_stream(
}
}?;

tracing::info!(
tracing::debug!(
"Flushing {} block heights from index files to historical Stream for indexer: {}",
blocks_from_index.len(),
indexer.get_full_name(),
Expand All @@ -137,22 +144,23 @@ pub(crate) async fn start_block_stream(
for block in &blocks_from_index {
crate::redis::xadd(
redis_connection_manager,
// TODO make configurable
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height", block)],
)
.await
.context("Failed to add block to Redis Stream")?;
}

let last_indexed_block =
let mut last_indexed_block =
blocks_from_index
.last()
.map_or(last_indexed_block, |&last_block_in_index| {
// Check for the case where index files are written right after we fetch the last_indexed_block metadata
std::cmp::max(last_block_in_index, last_indexed_block)
});

tracing::info!(
tracing::debug!(
"Starting near-lake-framework from {last_indexed_block} for indexer: {}",
indexer.get_full_name(),
);
Expand All @@ -167,9 +175,9 @@ pub(crate) async fn start_block_stream(

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

let mut filtered_block_count = 0;
while let Some(streamer_message) = stream.recv().await {
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
Expand All @@ -178,8 +186,6 @@ pub(crate) async fn start_block_stream(
);

if !matches.is_empty() {
filtered_block_count += 1;

crate::redis::xadd(
redis_connection_manager,
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
Expand All @@ -191,9 +197,9 @@ pub(crate) async fn start_block_stream(

drop(sender);

tracing::info!(
"Flushed {} unindexed block heights to historical Stream for indexer: {}",
filtered_block_count,
tracing::debug!(
"Stopped block stream at {} for indexer: {}",
last_indexed_block,
indexer.get_full_name(),
);

Expand Down Expand Up @@ -311,7 +317,7 @@ mod tests {

assert_eq!(
block_date,
chrono::Utc.timestamp_nanos(1695921400989555820 as i64)
chrono::Utc.timestamp_nanos(1695921400989555820_i64)
);
}

Expand Down Expand Up @@ -386,7 +392,7 @@ mod tests {

assert_eq!(
block_date,
chrono::Utc.timestamp_nanos(1695921400989555820 as i64)
chrono::Utc.timestamp_nanos(1695921400989555820_i64)
);
}

Expand Down
13 changes: 13 additions & 0 deletions block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ where
.map(|key| async move { self.s3_client.get_text_file(DELTA_LAKE_BUCKET, &key).await })
.collect::<Vec<_>>();

tracing::debug!(
"Found {} index files matching {} after date {}",
futures.len(),
contract_pattern,
start_date
);

let file_content_list = try_join_all(futures).await?;

let mut block_heights: Vec<_> = file_content_list
Expand All @@ -187,6 +194,12 @@ where
block_heights.dedup();
}

tracing::debug!(
"Found {} matching block heights matching {}",
block_heights.len(),
contract_pattern,
);

Ok(block_heights)
}
}
Expand Down
6 changes: 3 additions & 3 deletions block-streamer/src/rules/outcomes_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn build_indexer_rule_match(
chain_id: ChainId,
) -> IndexerRuleMatch {
IndexerRuleMatch {
chain_id: chain_id.clone(),
chain_id,
indexer_rule_id: indexer_rule.id,
indexer_rule_name: indexer_rule.name.clone(),
payload: build_indexer_rule_match_payload(
Expand All @@ -67,7 +67,7 @@ fn build_indexer_rule_match_payload(
match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => {
IndexerRuleMatchPayload::Actions {
block_hash: block_header_hash.to_string(),
block_hash: block_header_hash,
receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(),
transaction_hash,
}
Expand Down Expand Up @@ -101,7 +101,7 @@ fn build_indexer_rule_match_payload(
.clone();

IndexerRuleMatchPayload::Events {
block_hash: block_header_hash.to_string(),
block_hash: block_header_hash,
receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(),
transaction_hash,
event: event.event.clone(),
Expand Down

0 comments on commit fb6f0cf

Please sign in to comment.