Skip to content

Commit

Permalink
fix: Ensure Historical Stream is cleared before pushing new messages (#…
Browse files Browse the repository at this point in the history
…375)

On cancelling an existing Historical Process the Redis Stream is
cleared/deleted, but this is only the case if the async task is still
active and the cancellation token is consumed. This leads to the
following potential problems:
- When restarting Coordinator, if any Historical Processes are currently
active, the task and cancellation token will be lost, meaning the new
process will start without clearing the existing Stream
- When any branch within `tokio::select!` completes, all others are
cancelled. Therefore, if the Historical Process finishes successfully,
the cancellation branch will never be executed, meaning the Stream won't
be cleared. This becomes a problem if the Redis Stream still contains
messages when the next process is kicked off.

This PR updates cancellation so that the Stream is _always_ cleared when
a new Historical Process is kicked off.
  • Loading branch information
morgsmccauley authored Nov 9, 2023
1 parent 262b183 commit 208b224
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks ta
pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

Expand Down Expand Up @@ -51,11 +51,11 @@ impl Streamer {
let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
storage::del(
&redis_connection_manager,
storage::generate_historical_stream_key(&indexer.get_full_name()),
)
.await
tracing::info!(
target: crate::INDEXER,
"Cancelling existing historical backfill for indexer: {:?}",
indexer.get_full_name(),
);
},
_ = process_historical_messages_or_handle_error(
current_block_height,
Expand All @@ -64,9 +64,7 @@ impl Streamer {
&s3_client,
&chain_id,
&json_rpc_client,
) => {
Ok(())
}
) => { }
}
});

Expand All @@ -81,7 +79,7 @@ impl Streamer {
pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.cancellation_token.cancel();
task.handle.await??;
task.handle.await?;

return Ok(());
}
Expand Down Expand Up @@ -185,6 +183,11 @@ pub(crate) async fn process_historical_messages(
blocks_from_index.append(&mut blocks_between_indexed_and_current_block);

if !blocks_from_index.is_empty() {
storage::del(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
Expand Down

0 comments on commit 208b224

Please sign in to comment.