Skip to content

Commit

Permalink
fix: Add timeout between restarts of stalled resources (#932)
Browse files Browse the repository at this point in the history
Adding a timeout before restarting a stalled block stream or executor to
prevent them from consuming resources such as Hasura/Postgres
connections due to any behavior they take prior to whatever caused them
to crash.
  • Loading branch information
darunrs authored Jul 26, 2024
1 parent 9133af4 commit a640ced
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
8 changes: 6 additions & 2 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::indexer_config::IndexerConfig;
use crate::redis::{KeyProvider, RedisClient};
use crate::utils::exponential_retry;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Clone)]
pub struct BlockStreamsHandler {
client: BlockStreamerClient<Channel>,
Expand Down Expand Up @@ -258,11 +260,13 @@ impl BlockStreamsHandler {
tracing::info!(stale, stalled, "Restarting stalled block stream");
}
} else {
tracing::info!("Restarting stalled block stream");
tracing::info!(
"Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds"
);
}

self.stop(block_stream.stream_id.clone()).await?;

tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
let height = self.get_continuation_block_height(config).await?;
self.start(height, config).await?;

Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::indexer_config::IndexerConfig;

type TaskId = String;

const TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes
const TASK_TIMEOUT_SECONDS: u64 = 600; // 10 minutes

#[derive(Clone)]
pub struct DataLayerHandler {
Expand Down
5 changes: 4 additions & 1 deletion coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::utils::exponential_retry;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Clone)]
pub struct ExecutorsHandler {
client: RunnerClient<Channel>,
Expand Down Expand Up @@ -136,9 +138,10 @@ impl ExecutorsHandler {
}
}

tracing::info!("Restarting stalled executor");
tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds");

self.stop(executor.executor_id).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.start(config).await?;

Ok(())
Expand Down

0 comments on commit a640ced

Please sign in to comment.