Skip to content

Commit

Permalink
Add "health_check" to EcsTaskDefinitionConfig (#20663)
Browse files Browse the repository at this point in the history
Summary:
Gives us a place to stick a container-level health check command on code
servers in ECS.

Test Plan: BK
  • Loading branch information
gibsondan authored and PedramNavid committed Mar 28, 2024
1 parent afcef23 commit 2fb2a6c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@
is_required=False,
description="Additional sidecar containers to include in run task definitions.",
),
"server_health_check": Field(
Permissive(),
is_required=False,
description="Health check to include in code server task definitions.",
),
**SHARED_TASK_DEFINITION_FIELDS,
**SHARED_ECS_SCHEMA,
}
Expand All @@ -241,6 +246,7 @@ class EcsContainerContext(
("server_ecs_tags", Sequence[Mapping[str, Optional[str]]]),
("run_ecs_tags", Sequence[Mapping[str, Optional[str]]]),
("repository_credentials", Optional[str]),
("server_health_check", Optional[Mapping[str, Any]]),
],
)
):
Expand All @@ -265,6 +271,7 @@ def __new__(
server_ecs_tags: Optional[Sequence[Mapping[str, Optional[str]]]] = None,
run_ecs_tags: Optional[Sequence[Mapping[str, Optional[str]]]] = None,
repository_credentials: Optional[str] = None,
server_health_check: Optional[Mapping[str, Any]] = None,
):
return super(EcsContainerContext, cls).__new__(
cls,
Expand Down Expand Up @@ -293,6 +300,7 @@ def __new__(
repository_credentials=check.opt_str_param(
repository_credentials, "repository_credentials"
),
server_health_check=check.opt_mapping_param(server_health_check, "server_health_check"),
)

def merge(self, other: "EcsContainerContext") -> "EcsContainerContext":
Expand All @@ -317,6 +325,7 @@ def merge(self, other: "EcsContainerContext") -> "EcsContainerContext":
server_ecs_tags=[*other.server_ecs_tags, *self.server_ecs_tags],
run_ecs_tags=[*other.run_ecs_tags, *self.run_ecs_tags],
repository_credentials=other.repository_credentials or self.repository_credentials,
server_health_check=other.server_health_check or self.server_health_check,
)

def get_secrets_dict(self, secrets_manager) -> Mapping[str, str]:
Expand Down Expand Up @@ -410,5 +419,6 @@ def create_from_config(run_container_context) -> "EcsContainerContext":
server_ecs_tags=processed_context_value.get("server_ecs_tags"),
run_ecs_tags=processed_context_value.get("run_ecs_tags"),
repository_credentials=processed_context_value.get("repository_credentials"),
server_health_check=processed_context_value.get("server_health_check"),
)
)
10 changes: 10 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DagsterEcsTaskDefinitionConfig(
("volumes", Sequence[Mapping[str, Any]]),
("repository_credentials", Optional[str]),
("linux_parameters", Optional[Mapping[str, Any]]),
("health_check", Optional[Mapping[str, Any]]),
],
)
):
Expand Down Expand Up @@ -58,6 +59,7 @@ def __new__(
volumes: Optional[Sequence[Mapping[str, Any]]] = None,
repository_credentials: Optional[str] = None,
linux_parameters: Optional[Mapping[str, Any]] = None,
health_check: Optional[Mapping[str, Any]] = None,
):
return super(DagsterEcsTaskDefinitionConfig, cls).__new__(
cls,
Expand All @@ -80,6 +82,7 @@ def __new__(
check.opt_sequence_param(volumes, "volumes"),
check.opt_str_param(repository_credentials, "repository_credentials"),
check.opt_mapping_param(linux_parameters, "linux_parameters"),
check.opt_mapping_param(health_check, "health_check"),
)

def task_definition_dict(self):
Expand Down Expand Up @@ -112,6 +115,7 @@ def task_definition_dict(self):
else {}
),
({"linuxParameters": self.linux_parameters} if self.linux_parameters else {}),
({"healthCheck": self.health_check} if self.health_check else {}),
),
*self.sidecars,
],
Expand Down Expand Up @@ -177,6 +181,7 @@ def from_task_definition_dict(task_definition_dict, container_name):
"credentialsParameter"
),
linux_parameters=container_definition.get("linuxParameters"),
health_check=container_definition.get("healthCheck"),
)


Expand Down Expand Up @@ -233,6 +238,11 @@ def get_task_definition_dict_from_current_task(
)
)

# Don't automatically include health check - may be specific to the current task
container_definition = {
key: val for key, val in container_definition.items() if key != "healthCheck"
}

# Start with the current process's task's definition but remove
# extra keys that aren't useful for creating a new task definition
# (status, revision, etc.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ def container_context_config(configured_secret: Secret) -> Mapping[str, Any]:
],
"run_ecs_tags": [{"key": "ABC", "value": "DEF"}], # with value
"repository_credentials": "fake-secret-arn",
"server_health_check": {
"command": ["HELLO"],
},
},
}

Expand Down Expand Up @@ -540,6 +543,13 @@ def other_container_context_config(other_configured_secret):
}
],
"repository_credentials": "fake-secret-arn",
"server_health_check": {
"command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3,
"startPeriod": 0,
},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def test_merge(
],
},
]
assert merged.server_health_check == other_secrets_container_context.server_health_check
assert merged.run_sidecar_containers == [
{
"name": "OtherRunAgent",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ def test_reuse_task_definition(instance, ecs):
"secrets": secrets,
"environment": environment,
"command": ["echo", "HELLO"],
"healthCheck": {
"command": ["HELLO"],
},
},
{
"image": "other_image",
Expand Down Expand Up @@ -494,6 +497,20 @@ def test_reuse_task_definition(instance, ecs):
container_name,
)

# Changed healthCheck fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["containerDefinitions"][0]["healthCheck"] = {
"command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3,
"startPeriod": 0,
}
assert not instance.run_launcher._reuse_task_definition( # noqa: SLF001
DagsterEcsTaskDefinitionConfig.from_task_definition_dict(task_definition, container_name),
container_name,
)

# Any other diff passes
task_definition = copy.deepcopy(original_task_definition)
task_definition["somethingElse"] = "boom"
Expand Down

0 comments on commit 2fb2a6c

Please sign in to comment.