-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathkubernetes.py
270 lines (211 loc) · 10.3 KB
/
kubernetes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from __future__ import annotations
from os import PathLike
from typing import Any, Callable, Sequence
from airflow.models import TaskInstance
from airflow.utils.context import Context, context_merge
from cosmos.config import ProfileConfig
from cosmos.dbt.parser.output import extract_log_issues
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)
DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"
try:
# apache-airflow-providers-cncf-kubernetes >= 7.4.0
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import (
convert_env_vars,
)
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
except ImportError:
try:
# apache-airflow-providers-cncf-kubernetes < 7.4.0
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
except ImportError:
raise ImportError(
"Could not import KubernetesPodOperator. Ensure you've installed the Kubernetes provider "
"separately or with with `pip install astronomer-cosmos[...,kubernetes]`."
)
class DbtKubernetesBaseOperator(AbstractDbtBaseOperator, KubernetesPodOperator): # type: ignore
"""
Executes a dbt core cli command in a Kubernetes Pod.
"""
template_fields: Sequence[str] = tuple(
list(AbstractDbtBaseOperator.template_fields) + list(KubernetesPodOperator.template_fields)
)
intercept_flag = False
def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None:
self.profile_config = profile_config
super().__init__(**kwargs)
def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
env_vars_dict: dict[str, str] = dict()
for env_var_key, env_var_value in env.items():
env_vars_dict[env_var_key] = str(env_var_value)
for env_var in self.env_vars:
env_vars_dict[env_var.name] = env_var.value
self.env_vars: list[Any] = convert_env_vars(env_vars_dict)
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_kube_args(context, cmd_flags)
self.log.info(f"Running command: {self.arguments}")
result = KubernetesPodOperator.execute(self, context)
self.log.info(result)
def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
# This means that we don't have openlineage support, but we will create a ticket
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
# Parse ProfileConfig and add additional arguments to the dbt_cmd
if self.profile_config:
if self.profile_config.profile_name:
dbt_cmd.extend(["--profile", self.profile_config.profile_name])
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])
if self.project_dir:
dbt_cmd.extend(["--project-dir", str(self.project_dir)])
# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
class DbtBuildKubernetesOperator(DbtBuildMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core build command.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtLSKubernetesOperator(DbtLSMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core ls command.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtSeedKubernetesOperator(DbtSeedMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core seed command.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtSnapshotKubernetesOperator(DbtSnapshotMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core snapshot command.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtSourceKubernetesOperator(DbtSourceMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt source freshness command.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtRunKubernetesOperator(DbtRunMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core run command.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtTestKubernetesOperator(DbtTestMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core test command.
"""
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
if not on_warning_callback:
super().__init__(**kwargs)
else:
self.on_warning_callback = on_warning_callback
self.is_delete_operator_pod_original = kwargs.get("is_delete_operator_pod", None)
if self.is_delete_operator_pod_original is not None:
self.on_finish_action_original = (
OnFinishAction.DELETE_POD if self.is_delete_operator_pod_original else OnFinishAction.KEEP_POD
)
else:
self.on_finish_action_original = OnFinishAction(kwargs.get("on_finish_action", "delete_pod"))
self.is_delete_operator_pod_original = self.on_finish_action_original == OnFinishAction.DELETE_POD
# In order to read the pod logs, we need to keep the pod around.
# Depending on the on_finish_action & is_delete_operator_pod settings,
# we will clean up the pod later in the _handle_warnings method, which
# is called in on_success_callback.
kwargs["is_delete_operator_pod"] = False
kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
# Add an additional callback to both success and failure callbacks.
# In case of success, check for a warning in the logs and clean up the pod.
self.on_success_callback = kwargs.get("on_success_callback", None) or []
if isinstance(self.on_success_callback, list):
self.on_success_callback += [self._handle_warnings]
else:
self.on_success_callback = [self.on_success_callback, self._handle_warnings]
kwargs["on_success_callback"] = self.on_success_callback
# In case of failure, clean up the pod.
self.on_failure_callback = kwargs.get("on_failure_callback", None) or []
if isinstance(self.on_failure_callback, list):
self.on_failure_callback += [self._cleanup_pod]
else:
self.on_failure_callback = [self.on_failure_callback, self._cleanup_pod]
kwargs["on_failure_callback"] = self.on_failure_callback
super().__init__(**kwargs)
def _handle_warnings(self, context: Context) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
on_warning_callback with the updated context.
:param context: The original airflow context in which the build and run command was executed.
"""
if not (
isinstance(context["task_instance"], TaskInstance)
and isinstance(context["task_instance"].task, DbtTestKubernetesOperator)
):
return
task = context["task_instance"].task
logs = [
log.decode("utf-8") for log in task.pod_manager.read_pod_logs(task.pod, "base") if log.decode("utf-8") != ""
]
should_trigger_callback = all(
[
logs,
self.on_warning_callback,
DBT_NO_TESTS_MSG not in logs[-1],
DBT_WARN_MSG in logs[-1],
]
)
if should_trigger_callback:
warnings = int(logs[-1].split(f"{DBT_WARN_MSG}=")[1].split()[0])
if warnings > 0:
test_names, test_results = extract_log_issues(logs)
context_merge(context, test_names=test_names, test_results=test_results)
self.on_warning_callback(context)
self._cleanup_pod(context)
def _cleanup_pod(self, context: Context) -> None:
"""
Handles the cleaning up of the pod after success or failure, if
there is a on_warning_callback function defined.
:param context: The original airflow context in which the build and run command was executed.
"""
if not (
isinstance(context["task_instance"], TaskInstance)
and isinstance(context["task_instance"].task, DbtTestKubernetesOperator)
):
return
task = context["task_instance"].task
if task.pod:
task.on_finish_action = self.on_finish_action_original
task.cleanup(pod=task.pod, remote_pod=task.remote_pod)
class DbtRunOperationKubernetesOperator(DbtRunOperationMixin, DbtKubernetesBaseOperator):
"""
Executes a dbt core run-operation command.
"""
template_fields: Sequence[str] = DbtKubernetesBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
"""Executes a dbt core clone command."""
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)