diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 62dc053c8..79bead1ea 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -13,6 +13,7 @@ async fn main() -> Result<(), Box> { account_id: "morgs.near".to_string(), function_name: "test".to_string(), version: 0, + redis_stream: "morgs.near/test:block_stream".to_string(), rule: Some(Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "social.near".to_string(), status: Status::Success.into(), diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 21393577e..5d1e0517a 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -24,10 +24,12 @@ message StartStreamRequest { string function_name = 3; // Block height corresponding to the created/updated height of the indexer uint64 version = 4; + // Key of Redis Stream to publish blocks to + string redis_stream = 5; // Filter rule to apply to incoming blocks oneof rule { - ActionAnyRule action_any_rule = 5; - ActionFunctionCallRule action_function_call_rule = 6; + ActionAnyRule action_any_rule = 6; + ActionFunctionCallRule action_function_call_rule = 7; } } diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 8fb18da89..4c2be5a8e 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -20,15 +20,22 @@ pub struct BlockStream { pub indexer_config: IndexerConfig, pub chain_id: ChainId, pub version: u64, + pub redis_stream: String, } impl BlockStream { - pub fn new(indexer_config: IndexerConfig, chain_id: ChainId, version: u64) -> Self { + pub fn new( + indexer_config: IndexerConfig, + chain_id: ChainId, + version: u64, + redis_stream: String, + ) -> Self { Self { task: None, indexer_config, chain_id, version, + redis_stream, } } @@ -48,6 +55,7 @@ impl BlockStream { let indexer_config = self.indexer_config.clone(); let chain_id = self.chain_id.clone(); + let redis_stream = self.redis_stream.clone(); let handle = tokio::spawn(async move { tokio::select! { @@ -67,7 +75,8 @@ impl BlockStream { delta_lake_client, lake_s3_config, &chain_id, - LAKE_PREFETCH_SIZE + LAKE_PREFETCH_SIZE, + redis_stream ) => { result.map_err(|err| { tracing::error!( @@ -112,6 +121,7 @@ pub(crate) async fn start_block_stream( lake_s3_config: aws_sdk_s3::Config, chain_id: &ChainId, lake_prefetch_size: usize, + redis_stream: String, ) -> anyhow::Result<()> { tracing::info!( account_id = indexer.account_id.as_str(), @@ -161,10 +171,7 @@ pub(crate) async fn start_block_stream( for block in &blocks_from_index { let block = block.to_owned(); redis_client - .xadd( - crate::redis::generate_historical_stream_key(&indexer.get_full_name()), - &[("block_height".to_string(), block)], - ) + .xadd(redis_stream.clone(), &[("block_height".to_string(), block)]) .await .context("Failed to add block to Redis Stream")?; redis_client @@ -304,6 +311,7 @@ mod tests { lake_s3_config, &ChainId::Mainnet, 1, + "stream key".to_string(), ) .await .unwrap(); diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 0ec41948d..829ec4876 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -113,6 +113,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic indexer_config.clone(), self.chain_id.clone(), request.version, + request.redis_stream, ); block_stream @@ -240,6 +241,7 @@ mod tests { account_id: "morgs.near".to_string(), function_name: "test".to_string(), version: 0, + redis_stream: "stream".to_string(), rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "queryapi.dataplatform.near".to_string(), status: 1, @@ -273,6 +275,7 @@ mod tests { account_id: "morgs.near".to_string(), function_name: "test".to_string(), version: 0, + redis_stream: "stream".to_string(), rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { affected_account_id: "queryapi.dataplatform.near".to_string(), status: 1, diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/block_streams_handler.rs index 2fc9a1ce8..a62e9b20d 100644 --- a/coordinator/src/block_streams_handler.rs +++ b/coordinator/src/block_streams_handler.rs @@ -61,6 +61,7 @@ impl BlockStreamsHandlerImpl { account_id: String, function_name: String, version: u64, + redis_stream: String, rule: registry_types::MatchingRule, ) -> anyhow::Result<()> { let rule = match &rule { @@ -93,6 +94,7 @@ impl BlockStreamsHandlerImpl { account_id, function_name, version, + redis_stream, rule: Some(rule), }); diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 53f707598..7d2d288e6 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -134,6 +134,7 @@ async fn synchronise_block_streams( indexer_config.account_id.to_string(), indexer_config.function_name.clone(), registry_version, + indexer_config.get_redis_stream(), indexer_config.filter.matching_rule.clone(), ) .await?; @@ -386,12 +387,13 @@ mod tests { predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), predicate::eq(1), + predicate::eq("morgs.near/test:block_stream".to_string()), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _, _| Ok(())); + .returning(|_, _, _, _, _, _| Ok(())); synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) .await @@ -439,12 +441,13 @@ mod tests { predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), predicate::eq(200), + predicate::eq("morgs.near/test:block_stream".to_string()), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _, _| Ok(())); + .returning(|_, _, _, _, _, _| Ok(())); synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) .await @@ -492,12 +495,13 @@ mod tests { predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), predicate::eq(101), + predicate::eq("morgs.near/test:block_stream".to_string()), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _, _| Ok(())); + .returning(|_, _, _, _, _, _| Ok(())); synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) .await @@ -635,12 +639,13 @@ mod tests { predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), predicate::eq(200), + predicate::eq("morgs.near/test:block_stream".to_string()), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _, _| Ok(())); + .returning(|_, _, _, _, _, _| Ok(())); synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler) .await