From 2d570f01e4fe8b491514e10d04a189cb101a321f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 13:56:50 +1300 Subject: [PATCH] fix: Prevent duplicate block streams from starting --- .../src/server/block_streamer_service.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 4e9cbe67c..79fd1b541 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -45,6 +45,14 @@ impl BlockStreamerService { block_streams: Mutex::new(HashMap::new()), } } + + fn get_block_streams_lock( + &self, + ) -> Result>, Status> { + self.block_streams + .lock() + .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err))) + } } #[tonic::async_trait] @@ -98,6 +106,12 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic indexer_rule: filter_rule, }; + let lock = self.get_block_streams_lock()?; + match lock.get(&indexer_config.get_hash_id()) { + Some(_) => return Err(Status::already_exists("Block stream already exists")), + None => drop(lock), + } + let mut block_stream = block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); @@ -107,12 +121,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic self.redis_connection_manager.clone(), self.delta_lake_client.clone(), ) - .map_err(|_| Status::already_exists("Block stream already exists"))?; + .map_err(|_| Status::internal("Failed to start block stream"))?; - let mut lock = self - .block_streams - .lock() - .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; + let mut lock = self.get_block_streams_lock()?; lock.insert(indexer_config.get_hash_id(), block_stream); Ok(Response::new(blockstreamer::StartStreamResponse { @@ -129,10 +140,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let stream_id = request.stream_id; let exising_block_stream = { - let mut lock = self - .block_streams - .lock() - .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; + let mut lock = self.get_block_streams_lock()?; lock.remove(&stream_id) };