diff --git a/block-streamer/src/block_streamer.rs b/block-streamer/src/block_streamer.rs index dade0b6ec..1efb62918 100644 --- a/block-streamer/src/block_streamer.rs +++ b/block-streamer/src/block_streamer.rs @@ -100,12 +100,13 @@ pub(crate) async fn start_block_stream( chain_id: &ChainId, ) -> anyhow::Result<()> { tracing::info!( - "Starting block stream from {start_block_height} for indexer: {}", + "Starting block stream at {start_block_height} for indexer: {}", indexer.get_full_name(), ); let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client.clone()); + // TODO move to DeltaLakeClient let start_date = get_nearest_block_date(&s3_client, start_block_height, chain_id).await?; let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?; @@ -116,6 +117,12 @@ pub(crate) async fn start_block_stream( affected_account_id, .. } => { + tracing::debug!( + "Fetching block heights starting from {} from delta lake for indexer: {}", + start_date.date_naive(), + indexer.get_full_name() + ); + // TODO Remove all block heights after start_block_height delta_lake_client .list_matching_block_heights(start_date, affected_account_id) .await @@ -128,7 +135,7 @@ pub(crate) async fn start_block_stream( } }?; - tracing::info!( + tracing::debug!( "Flushing {} block heights from index files to historical Stream for indexer: {}", blocks_from_index.len(), indexer.get_full_name(), @@ -137,6 +144,7 @@ pub(crate) async fn start_block_stream( for block in &blocks_from_index { crate::redis::xadd( redis_connection_manager, + // TODO make configurable crate::redis::generate_historical_stream_key(&indexer.get_full_name()), &[("block_height", block)], ) @@ -144,7 +152,7 @@ pub(crate) async fn start_block_stream( .context("Failed to add block to Redis Stream")?; } - let last_indexed_block = + let mut last_indexed_block = blocks_from_index .last() .map_or(last_indexed_block, |&last_block_in_index| { @@ -152,7 +160,7 @@ pub(crate) async fn start_block_stream( std::cmp::max(last_block_in_index, last_indexed_block) }); - tracing::info!( + tracing::debug!( "Starting near-lake-framework from {last_indexed_block} for indexer: {}", indexer.get_full_name(), ); @@ -167,9 +175,9 @@ pub(crate) async fn start_block_stream( let (sender, mut stream) = near_lake_framework::streamer(lake_config); - let mut filtered_block_count = 0; while let Some(streamer_message) = stream.recv().await { let block_height = streamer_message.block.header.height; + last_indexed_block = block_height; let matches = crate::rules::reduce_indexer_rule_matches( &indexer.indexer_rule, @@ -178,8 +186,6 @@ pub(crate) async fn start_block_stream( ); if !matches.is_empty() { - filtered_block_count += 1; - crate::redis::xadd( redis_connection_manager, crate::redis::generate_historical_stream_key(&indexer.get_full_name()), @@ -191,9 +197,9 @@ pub(crate) async fn start_block_stream( drop(sender); - tracing::info!( - "Flushed {} unindexed block heights to historical Stream for indexer: {}", - filtered_block_count, + tracing::debug!( + "Stopped block stream at {} for indexer: {}", + last_indexed_block, indexer.get_full_name(), ); @@ -311,7 +317,7 @@ mod tests { assert_eq!( block_date, - chrono::Utc.timestamp_nanos(1695921400989555820 as i64) + chrono::Utc.timestamp_nanos(1695921400989555820_i64) ); } @@ -386,7 +392,7 @@ mod tests { assert_eq!( block_date, - chrono::Utc.timestamp_nanos(1695921400989555820 as i64) + chrono::Utc.timestamp_nanos(1695921400989555820_i64) ); } diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index c7c894630..635a2c3c1 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -167,6 +167,13 @@ where .map(|key| async move { self.s3_client.get_text_file(DELTA_LAKE_BUCKET, &key).await }) .collect::>(); + tracing::debug!( + "Found {} index files matching {} after date {}", + futures.len(), + contract_pattern, + start_date + ); + let file_content_list = try_join_all(futures).await?; let mut block_heights: Vec<_> = file_content_list @@ -187,6 +194,12 @@ where block_heights.dedup(); } + tracing::debug!( + "Found {} matching block heights matching {}", + block_heights.len(), + contract_pattern, + ); + Ok(block_heights) } } diff --git a/block-streamer/src/rules/outcomes_reducer.rs b/block-streamer/src/rules/outcomes_reducer.rs index 72433c9b7..84d62ca11 100644 --- a/block-streamer/src/rules/outcomes_reducer.rs +++ b/block-streamer/src/rules/outcomes_reducer.rs @@ -43,7 +43,7 @@ fn build_indexer_rule_match( chain_id: ChainId, ) -> IndexerRuleMatch { IndexerRuleMatch { - chain_id: chain_id.clone(), + chain_id, indexer_rule_id: indexer_rule.id, indexer_rule_name: indexer_rule.name.clone(), payload: build_indexer_rule_match_payload( @@ -67,7 +67,7 @@ fn build_indexer_rule_match_payload( match &indexer_rule.matching_rule { MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => { IndexerRuleMatchPayload::Actions { - block_hash: block_header_hash.to_string(), + block_hash: block_header_hash, receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(), transaction_hash, } @@ -101,7 +101,7 @@ fn build_indexer_rule_match_payload( .clone(); IndexerRuleMatchPayload::Events { - block_hash: block_header_hash.to_string(), + block_hash: block_header_hash, receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(), transaction_hash, event: event.event.clone(),