From 208b224ef8b55949869164e2b861dd698f080651 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 9 Nov 2023 15:18:22 +1300 Subject: [PATCH] fix: Ensure Historical Stream is cleared before pushing new messages (#375) On cancelling an existing Historical Process the Redis Stream is cleared/deleted, but this is only the case if the async task is still active and the cancellation token is consumed. This leads to the following potential problems: - When restarting Coordinator, if any Historical Processes are currently active, the task and cancellation token will be lost, meaning the new process will start without clearing the existing Stream - When any branch within `tokio::select!` completes, all others are cancelled. Therefore, if the Historical Process finishes successfully, the cancellation branch will never be executed, meaning the Stream won't be cleared. This becomes a problem if the Redis Stream still contains messages when the next process is kicked off. This PR updates cancellation so that the Stream is _always_ cleared when a new Historical Process is kicked off. --- .../src/historical_block_processing.rs | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index f091c3f86..82118aea3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -18,7 +18,7 @@ pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks ta pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; pub struct Task { - handle: JoinHandle>, + handle: JoinHandle<()>, cancellation_token: tokio_util::sync::CancellationToken, } @@ -51,11 +51,11 @@ impl Streamer { let handle = tokio::spawn(async move { tokio::select! { _ = cancellation_token_clone.cancelled() => { - storage::del( - &redis_connection_manager, - storage::generate_historical_stream_key(&indexer.get_full_name()), - ) - .await + tracing::info!( + target: crate::INDEXER, + "Cancelling existing historical backfill for indexer: {:?}", + indexer.get_full_name(), + ); }, _ = process_historical_messages_or_handle_error( current_block_height, @@ -64,9 +64,7 @@ impl Streamer { &s3_client, &chain_id, &json_rpc_client, - ) => { - Ok(()) - } + ) => { } } }); @@ -81,7 +79,7 @@ impl Streamer { pub async fn cancel(&mut self) -> anyhow::Result<()> { if let Some(task) = self.task.take() { task.cancellation_token.cancel(); - task.handle.await??; + task.handle.await?; return Ok(()); } @@ -185,6 +183,11 @@ pub(crate) async fn process_historical_messages( blocks_from_index.append(&mut blocks_between_indexed_and_current_block); if !blocks_from_index.is_empty() { + storage::del( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; storage::sadd( redis_connection_manager, storage::STREAMS_SET_KEY,