Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding startupProbe to airflow services charts #33107

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,39 @@ server_tls_key_file = /etc/pgbouncer/server.key
{{- end }}
{{- end }}


{{- define "scheduler_startup_check_command" }}
{{- if semverCompare ">=2.5.0" .Values.airflowVersion }}
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --local
{{- else if semverCompare ">=2.1.0" .Values.airflowVersion }}
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --hostname $(hostname)
{{- else }}
- sh
- -c
- |
CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -Wignore -c "
import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys
with create_session() as session:
job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)"
{{- end }}
{{- end }}

{{- define "triggerer_liveness_check_command" }}
{{- if semverCompare ">=2.5.0" .Values.airflowVersion }}
- sh
Expand Down
8 changes: 4 additions & 4 deletions chart/templates/configmaps/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ data:
{{- .Values.dags.gitSync.knownHosts | nindent 4 }}
{{- end }}

{{- if or (eq $.Values.executor "LocalKubernetesExecutor") (eq $.Values.executor "KubernetesExecutor") (eq $.Values.executor "CeleryKubernetesExecutor") }}
{{- if semverCompare ">=1.10.12" .Values.airflowVersion }}
{{/* {{- if or (eq $.Values.executor "LocalKubernetesExecutor") (eq $.Values.executor "KubernetesExecutor") (eq $.Values.executor "CeleryKubernetesExecutor") }}*/}}
{{/* {{- if semverCompare ">=1.10.12" .Values.airflowVersion }}*/}}
pod_template_file.yaml: |-
{{- if .Values.podTemplate }}
{{- tpl .Values.podTemplate . | nindent 4 }}
{{- else }}
{{- tpl (.Files.Get "files/pod-template-file.kubernetes-helm-yaml") . | nindent 4 }}
{{- end }}
{{- end }}
{{- end }}
{{/* {{- end }}*/}}
{{/* {{- end }}*/}}

{{- if .Values.kerberos.enabled }}
krb5.conf: |-
Expand Down
11 changes: 11 additions & 0 deletions chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ spec:
{{- else }}
{{- include "scheduler_liveness_check_command" . | indent 14 }}
{{- end }}
startupProbe:
timeoutSeconds: {{ .Values.scheduler.startupProbe.timeoutSeconds }}
failureThreshold: {{ .Values.scheduler.startupProbe.failureThreshold }}
periodSeconds: {{ .Values.scheduler.startupProbe.periodSeconds }}
exec:
command:
{{- if .Values.scheduler.startupProbe.command }}
{{- toYaml .Values.scheduler.startupProbe.command | nindent 16 }}
{{- else }}
{{- include "scheduler_startup_check_command" . | indent 14 }}
{{- end }}
{{- if and $local (not $elasticsearch) }}
# Serve logs if we're in local mode and we don't have elasticsearch enabled.
ports:
Expand Down
13 changes: 13 additions & 0 deletions chart/templates/webserver/webserver-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ spec:
timeoutSeconds: {{ .Values.webserver.readinessProbe.timeoutSeconds }}
failureThreshold: {{ .Values.webserver.readinessProbe.failureThreshold }}
periodSeconds: {{ .Values.webserver.readinessProbe.periodSeconds }}
startupProbe:
httpGet:
path: {{ if .Values.config.webserver.base_url }}{{- with urlParse (tpl .Values.config.webserver.base_url .) }}{{ .path }}{{ end }}{{ end }}/health
port: {{ .Values.ports.airflowUI }}
{{- if .Values.config.webserver.base_url}}
httpHeaders:
- name: Host
value: {{ regexReplaceAll ":\\d+$" (urlParse (tpl .Values.config.webserver.base_url .)).host "" }}
{{- end }}
scheme: {{ .Values.webserver.startupProbe.scheme | default "http" }}
timeoutSeconds: {{ .Values.webserver.startupProbe.timeoutSeconds }}
failureThreshold: {{ .Values.webserver.startupProbe.failureThreshold }}
periodSeconds: {{ .Values.webserver.startupProbe.periodSeconds }}
envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }}
env:
{{- include "custom_airflow_environment" . | indent 10 }}
Expand Down
62 changes: 62 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,41 @@
}
}
},
"startupProbe": {
"description": "Startup probe configuration for scheduler container.",
"type": "object",
"additionalProperties": false,
"properties": {
"timeoutSeconds": {
"description": "Number of seconds after which the probe times out. Minimum value is 1 seconds.",
"type": "integer",
"default": 20
},
"failureThreshold": {
"description": "Minimum consecutive failures for the probe to be considered failed after having succeeded. Minimum value is 1.",
"type": "integer",
"default": 6
},
"periodSeconds": {
"description": "How often (in seconds) to perform the probe. Minimum value is 1.",
"type": "integer",
"default": 10
},
"command": {
"description": "Command for livenessProbe",
"type": [
"array",
"null"
],
"items": {
"type": [
"string",
"null"
]
}
}
}
},
"replicas": {
"description": "Airflow 2.0 allows users to run multiple schedulers. This feature is only recommended for MySQL 8+ and PostgreSQL",
"type": "integer",
Expand Down Expand Up @@ -3762,6 +3797,33 @@
}
}
},
"startupProbe": {
"description": "Startup probe configuration.",
"type": "object",
"additionalProperties": false,
"properties": {
"timeoutSeconds": {
"description": "Webserver Startup probe timeout seconds.",
"type": "integer",
"default": 20
},
"failureThreshold": {
"description": "Webserver Startup probe failure threshold.",
"type": "integer",
"default": 6
},
"periodSeconds": {
"description": "Webserver Startup probe period seconds.",
"type": "integer",
"default": 10
},
"scheme": {
"description": "Webserver Startup probe scheme.",
"type": "string",
"default": "HTTP"
}
}
},
"replicas": {
"description": "How many Airflow webserver replicas should run.",
"type": "integer",
Expand Down
14 changes: 14 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,14 @@ scheduler:
failureThreshold: 5
periodSeconds: 60
command: ~

# Wait for at most 10 minutes (6*10s) for the scheduler container to startup.
# livenessProbe kicks in after the startup
startupProbe:
failureThreshold: 6
periodSeconds: 10
timeoutSeconds: 20
command: ~
# Airflow 2.0 allows users to run multiple schedulers,
# However this feature is only recommended for MySQL 8+ and Postgres
replicas: 1
Expand Down Expand Up @@ -1066,6 +1074,12 @@ webserver:
periodSeconds: 10
scheme: HTTP

startupProbe:
timeoutSeconds: 20
failureThreshold: 6
periodSeconds: 10
scheme: HTTP

# Number of webservers
replicas: 1
# Max number of old replicasets to retain
Expand Down
42 changes: 42 additions & 0 deletions helm_tests/airflow_core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,30 @@ def test_livenessprobe_values_are_configurable(self):
"spec.template.spec.containers[0].livenessProbe.exec.command", docs[0]
)

def test_startupprobe_values_are_configurable(self):
docs = render_chart(
values={
"scheduler": {
"startupProbe": {
"timeoutSeconds": 111,
"failureThreshold": 222,
"periodSeconds": 333,
"command": ["sh", "-c", "echo", "wow such test"],
}
},
},
show_only=["templates/scheduler/scheduler-deployment.yaml"],
)

assert 111 == jmespath.search("spec.template.spec.containers[0].startupProbe.timeoutSeconds", docs[0])
assert 222 == jmespath.search(
"spec.template.spec.containers[0].startupProbe.failureThreshold", docs[0]
)
assert 333 == jmespath.search("spec.template.spec.containers[0].startupProbe.periodSeconds", docs[0])
assert ["sh", "-c", "echo", "wow such test"] == jmespath.search(
"spec.template.spec.containers[0].startupProbe.exec.command", docs[0]
)

@pytest.mark.parametrize(
"airflow_version, probe_command",
[
Expand All @@ -375,6 +399,24 @@ def test_livenessprobe_command_depends_on_airflow_version(self, airflow_version,
in jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1]
)

@pytest.mark.parametrize(
"airflow_version, probe_command",
[
("1.9.0", "from airflow.jobs.scheduler_job import SchedulerJob"),
("2.1.0", "airflow jobs check --job-type SchedulerJob --hostname $(hostname)"),
("2.5.0", "airflow jobs check --job-type SchedulerJob --local"),
],
)
def test_startupprobe_command_depends_on_airflow_version(self, airflow_version, probe_command):
docs = render_chart(
values={"airflowVersion": f"{airflow_version}"},
show_only=["templates/scheduler/scheduler-deployment.yaml"],
)
assert (
probe_command
in jmespath.search("spec.template.spec.containers[0].startupProbe.exec.command", docs[0])[-1]
)

@pytest.mark.parametrize(
"log_persistence_values, expected_volume",
[
Expand Down
25 changes: 22 additions & 3 deletions helm_tests/webserver/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class TestWebserverDeployment:
"""Tests webserver deployment."""

def test_should_add_host_header_to_liveness_and_readiness_probes(self):
def test_should_add_host_header_to_liveness_and_readiness_and_startup_probes(self):
docs = render_chart(
values={
"config": {
Expand All @@ -41,8 +41,11 @@ def test_should_add_host_header_to_liveness_and_readiness_probes(self):
assert {"name": "Host", "value": "example.com"} in jmespath.search(
"spec.template.spec.containers[0].readinessProbe.httpGet.httpHeaders", docs[0]
)
assert {"name": "Host", "value": "example.com"} in jmespath.search(
"spec.template.spec.containers[0].startupProbe.httpGet.httpHeaders", docs[0]
)

def test_should_add_path_to_liveness_and_readiness_probes(self):
def test_should_add_path_to_liveness_and_readiness_and_startup_probes(self):
docs = render_chart(
values={
"config": {
Expand All @@ -60,6 +63,10 @@ def test_should_add_path_to_liveness_and_readiness_probes(self):
jmespath.search("spec.template.spec.containers[0].readinessProbe.httpGet.path", docs[0])
== "/mypath/path/health"
)
assert (
jmespath.search("spec.template.spec.containers[0].startupProbe.httpGet.path", docs[0])
== "/mypath/path/health"
)

@pytest.mark.parametrize(
"revision_history_limit, global_revision_history_limit",
Expand Down Expand Up @@ -91,6 +98,10 @@ def test_should_not_contain_host_header(self, values):
jmespath.search("spec.template.spec.containers[0].readinessProbe.httpGet.httpHeaders", docs[0])
is None
)
assert (
jmespath.search("spec.template.spec.containers[0].startupProbe.httpGet.httpHeaders", docs[0])
is None
)

def test_should_use_templated_base_url_for_probes(self):
docs = render_chart(
Expand All @@ -111,15 +122,20 @@ def test_should_use_templated_base_url_for_probes(self):
assert {"name": "Host", "value": "release-name.com"} in jmespath.search(
"readinessProbe.httpGet.httpHeaders", container
)
assert {"name": "Host", "value": "release-name.com"} in jmespath.search(
"startupProbe.httpGet.httpHeaders", container
)
assert "/mypath/release-name/path/health" == jmespath.search("livenessProbe.httpGet.path", container)
assert "/mypath/release-name/path/health" == jmespath.search("readinessProbe.httpGet.path", container)
assert "/mypath/release-name/path/health" == jmespath.search("startupProbe.httpGet.path", container)

def test_should_add_scheme_to_liveness_and_readiness_probes(self):
def test_should_add_scheme_to_liveness_and_readiness_and_startup_probes(self):
docs = render_chart(
values={
"webserver": {
"livenessProbe": {"scheme": "HTTPS"},
"readinessProbe": {"scheme": "HTTPS"},
"startupProbe": {"scheme": "HTTPS"},
}
},
show_only=["templates/webserver/webserver-deployment.yaml"],
Expand All @@ -131,6 +147,9 @@ def test_should_add_scheme_to_liveness_and_readiness_probes(self):
assert "HTTPS" in jmespath.search(
"spec.template.spec.containers[0].readinessProbe.httpGet.scheme", docs[0]
)
assert "HTTPS" in jmespath.search(
"spec.template.spec.containers[0].startupProbe.httpGet.scheme", docs[0]
)

def test_should_add_volume_and_volume_mount_when_exist_webserver_config(self):
docs = render_chart(
Expand Down