-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathconfigmap.yaml
142 lines (121 loc) · 4.7 KB
/
configmap.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
################################
## Airflow ConfigMap
#################################
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ template "airflow_config" . }}
labels:
tier: airflow
component: config
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- with .Values.labels }}
{{ toYaml . | indent 4 }}
{{- end }}
data:
# These are system-specified config overrides.
airflow.cfg: |
[core]
load_examples = False
colored_console_log = False
executor = {{ .Values.executor }}
{{- if .Values.elasticsearch.enabled }}
remote_logging = True
{{- end }}
[webserver]
base_url = {{ printf "https://%s" (include "airflow_url" .) }}
expose_config = True
rbac = True
{{- if eq .Values.executor "CeleryExecutor" }}
[celery]
default_queue = celery
{{- end }}
[scheduler]
scheduler_heartbeat_sec = 5
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow
statsd_host = {{ printf "%s-statsd" .Release.Name }}
# Restart Scheduler every 41460 seconds (11 hours 31 minutes)
# The odd time is chosen so it is not always restarting on the same "hour" boundary
run_duration = 41460
{{- if .Values.elasticsearch.enabled }}
[elasticsearch]
# The elasticsearch variables were updated to the shorter names in v1.10.4
write_stdout = True
json_format = True
log_id_template = {dag_id}_{task_id}_{execution_date}_{try_number}
# This is the older format for these variable names, kept here for backward compatibility
elasticsearch_write_stdout = True
elasticsearch_json_format = True
elasticsearch_log_id_template = {dag_id}_{task_id}_{execution_date}_{try_number}
[elasticsearch_configs]
max_retries = 3
timeout = 30
retry_timeout = True
{{- end }}
{{- if eq .Values.executor "KubernetesExecutor" }}
[kubernetes]
namespace = {{ .Release.Namespace }}
airflow_configmap = {{ include "airflow_config" . }}
airflow_local_settings_configmap = {{ include "airflow_config" . }}
worker_container_repository = {{ .Values.images.airflow.repository }}
worker_container_tag = {{ .Values.images.airflow.tag }}
worker_container_image_pull_policy = {{ .Values.images.airflow.pullPolicy }}
worker_service_account_name = {{ .Release.Name }}-worker-serviceaccount
image_pull_secrets = {{ template "registry_secret" . }}
dags_in_image = True
delete_worker_pods = True
[kubernetes_secrets]
AIRFLOW__CORE__SQL_ALCHEMY_CONN = {{ printf "%s=connection" (include "airflow_metadata_secret" .) }}
AIRFLOW__CORE__FERNET_KEY = {{ printf "%s=fernet-key" (include "fernet_key_secret" .) }}
{{- if .Values.elasticsearch.enabled }}
AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST = {{ printf "%s=connection" (include "elasticsearch_secret" .) }}
{{- end }}
[kubernetes_labels]
tier = airflow
component = worker
release = {{ .Release.Name }}
platform = {{ .Values.platform.release }}
workspace = {{ .Values.platform.workspace }}
{{- end }}
[astronomer]
{{- if .Values.webserver.jwtSigningCertificateSecretName }}
jwt_signing_cert = {{ template "airflow_webserver_jwt_cert_path" . }}
jwt_audience = {{ include "airflow_url" . }}
{{- end }}
webserver_config.py: |
import os
from airflow import configuration as conf
from flask_appbuilder.security.manager import AUTH_REMOTE_USER
basedir = os.path.abspath(os.path.dirname(__file__))
# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')
# Flask-WTF flag for CSRF
CSRF_ENABLED = True
# ----------------------------------------------------
# AUTHENTICATION CONFIG
# ----------------------------------------------------
{{- if .Values.webserver.jwtSigningCertificateSecretName }}
AUTH_TYPE = AUTH_REMOTE_USER
from astronomer.flask_appbuilder.security import AirflowAstroSecurityManager
SECURITY_MANAGER_CLASS = AirflowAstroSecurityManager
{{- end }}
airflow_local_settings.py: |
from airflow.contrib.kubernetes.pod import Pod
from airflow.configuration import conf
def pod_mutation_hook(pod: Pod):
extra_labels = {
"kubernetes-executor": "False",
"kubernetes-pod-operator": "False"
}
if 'airflow-worker' in pod.labels.keys() or \
conf.get('core', 'EXECUTOR') == "KubernetesExecutor":
extra_labels["kubernetes-executor"] = "True"
else:
extra_labels["kubernetes-pod-operator"] = "True"
pod.labels.update(extra_labels)
pod.tolerations += {{ toJson .Values.podMutation.tolerations }}
pod.affinity.update({{ toJson .Values.podMutation.affinity }})