Skip to content

Commit

Permalink
refactor: Push executor sync logic up to lifecycle manager
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Aug 12, 2024
1 parent 73b7e70 commit 7fc6666
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 100 deletions.
185 changes: 92 additions & 93 deletions coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ use crate::redis::KeyProvider;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Debug, PartialEq)]
pub enum ExecutorStatus {
/// Executor is running as expected
Active,
/// Executor is in an unhealthy state
Unhealthy,
/// Executor
Inactive,
/// Executor is not synchronized with the latest config
Outdated,
}

#[cfg(not(test))]
use ExecutorsClientWrapperImpl as ExecutorsClientWrapper;
#[cfg(test)]
Expand Down Expand Up @@ -148,56 +160,40 @@ impl ExecutorsHandlerImpl {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
executor: ExecutorInfo,
) -> anyhow::Result<()> {
fn is_healthy(&self, executor: ExecutorInfo) -> bool {
if let Some(health) = executor.health {
if !matches!(
return !matches!(
health.execution_state.try_into(),
Ok(ExecutionState::Stalled)
) {
return Ok(());
}
);
}

tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds");

self.stop(executor.executor_id).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.start(config).await?;

Ok(())
false
}

pub async fn synchronise(&self, config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn get_status(&self, config: &IndexerConfig) -> anyhow::Result<ExecutorStatus> {
let executor = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(executor) = executor {
if executor.version == config.get_registry_version() {
self.ensure_healthy(config, executor).await?;
return Ok(());
if executor.version != config.get_registry_version() {
return Ok(ExecutorStatus::Outdated);
}

tracing::info!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
version = executor.version,
"Stopping outdated executor"
);
if !self.is_healthy(executor) {
return Ok(ExecutorStatus::Unhealthy);
}

self.stop(executor.executor_id).await?;
return Ok(ExecutorStatus::Active);
}

tracing::info!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
version = config.get_registry_version(),
"Starting executor"
);
Ok(ExecutorStatus::Inactive)
}

pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> {
self.stop_if_needed(config.account_id.clone(), config.function_name.clone())
.await?;

self.start(config).await?;

Expand Down Expand Up @@ -238,18 +234,63 @@ mod tests {
}

#[tokio::test]
async fn resumes_stopped_executors() {
async fn returns_executor_status() {
let config = IndexerConfig::default();
let test_cases = [
(
Some(ExecutorInfo {
version: config.get_registry_version(),
health: None,
..Default::default()
}),
ExecutorStatus::Unhealthy,
),
(None, ExecutorStatus::Inactive),
(
Some(ExecutorInfo {
version: config.get_registry_version() - 1,
..Default::default()
}),
ExecutorStatus::Outdated,
),
(
Some(ExecutorInfo {
version: config.get_registry_version(),
health: Some(runner::Health {
execution_state: runner::ExecutionState::Running.into(),
}),
..Default::default()
}),
ExecutorStatus::Active,
),
];

for (executor, expected_status) in test_cases {
let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| {
if let Some(executor) = executor.clone() {
Ok(Response::new(executor))
} else {
Err(tonic::Status::not_found("not found"))
}
});

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

assert_eq!(handler.get_status(&config).await.unwrap(), expected_status);
}
}

#[tokio::test]
async fn starts_executors() {
let config = IndexerConfig::default();

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(eq(GetExecutorRequest {
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| Err(tonic::Status::not_found("not found")))
.once();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.with(eq(StartExecutorRequest {
Expand All @@ -271,11 +312,11 @@ mod tests {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
handler.start(&config).await.unwrap()
}

#[tokio::test]
async fn reconfigures_outdated_executors() {
async fn restarts_executors() {
let config = IndexerConfig::default();

let executor = ExecutorInfo {
Expand Down Expand Up @@ -324,11 +365,11 @@ mod tests {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
handler.restart(&config).await.unwrap()
}

#[tokio::test]
async fn restarts_unhealthy_executors() {
async fn unhealthy_executor() {
tokio::time::pause();

let config = IndexerConfig::default();
Expand All @@ -343,49 +384,17 @@ mod tests {
}),
};

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_stop_executor::<StopExecutorRequest>()
.with(eq(StopExecutorRequest {
executor_id: executor.executor_id.clone(),
}))
.returning(|_| {
Ok(Response::new(StopExecutorResponse {
executor_id: "executor_id".to_string(),
}))
})
.once();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.with(eq(StartExecutorRequest {
code: config.code.clone(),
schema: config.schema.clone(),
redis_stream: config.get_redis_stream_key(),
version: config.get_registry_version(),
account_id: config.account_id.to_string(),
function_name: config.function_name.clone(),
}))
.returning(|_| {
Ok(tonic::Response::new(StartExecutorResponse {
executor_id: "executor_id".to_string(),
}))
})
.once();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| Ok(Response::new(executor.clone())))
.once();
let mock_client = ExecutorsClientWrapper::default();

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
assert!(!handler.is_healthy(executor));
}

#[tokio::test]
async fn ignores_healthy_executors() {
async fn healthy_executors() {
tokio::time::pause();

let config = IndexerConfig::default();
Expand All @@ -408,23 +417,13 @@ mod tests {
}),
};

let mut mock_client = ExecutorsClientWrapper::default();
mock_client
.expect_stop_executor::<StopExecutorRequest>()
.never();
mock_client
.expect_start_executor::<StartExecutorRequest>()
.never();
mock_client
.expect_get_executor::<GetExecutorRequest>()
.with(always())
.returning(move |_| Ok(Response::new(executor.clone())));
let mock_client = ExecutorsClientWrapper::default();

let handler = ExecutorsHandlerImpl {
client: mock_client,
};

handler.synchronise(&config).await.unwrap()
assert!(handler.is_healthy(executor));
}
}
}
Loading

0 comments on commit 7fc6666

Please sign in to comment.