Skip to content

Commit

Permalink
fix: Prevent duplicate block streams from starting
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 14, 2023
1 parent a7b9f36 commit 2d570f0
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ impl BlockStreamerService {
block_streams: Mutex::new(HashMap::new()),
}
}

fn get_block_streams_lock(
&self,
) -> Result<std::sync::MutexGuard<HashMap<String, block_stream::BlockStream>>, Status> {
self.block_streams
.lock()
.map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -98,6 +106,12 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
indexer_rule: filter_rule,
};

let lock = self.get_block_streams_lock()?;
match lock.get(&indexer_config.get_hash_id()) {
Some(_) => return Err(Status::already_exists("Block stream already exists")),
None => drop(lock),
}

let mut block_stream =
block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone());

Expand All @@ -107,12 +121,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
self.redis_connection_manager.clone(),
self.delta_lake_client.clone(),
)
.map_err(|_| Status::already_exists("Block stream already exists"))?;
.map_err(|_| Status::internal("Failed to start block stream"))?;

let mut lock = self
.block_streams
.lock()
.map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?;
let mut lock = self.get_block_streams_lock()?;
lock.insert(indexer_config.get_hash_id(), block_stream);

Ok(Response::new(blockstreamer::StartStreamResponse {
Expand All @@ -129,10 +140,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
let stream_id = request.stream_id;

let exising_block_stream = {
let mut lock = self
.block_streams
.lock()
.map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?;
let mut lock = self.get_block_streams_lock()?;
lock.remove(&stream_id)
};

Expand Down

0 comments on commit 2d570f0

Please sign in to comment.