Skip to content

Commit

Permalink
Consider custom pod labels on pod finding process on `KubernetesPodOp…
Browse files Browse the repository at this point in the history
…erator` (apache#33057)

* consider custom pod labels on pod finding process on KubernetesPodOperator

---------

Co-authored-by: eladkal <[email protected]>
  • Loading branch information
okayhooni and eladkal authored Aug 4, 2023
1 parent d31c775 commit 164526d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
5 changes: 4 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
self.log.info("Skipping deleting pod: %s", pod.metadata.name)

def _build_find_pod_label_selector(self, context: Context | None = None, *, exclude_checked=True) -> str:
labels = self._get_ti_pod_labels(context, include_try_number=False)
labels = {
**self.labels,
**self._get_ti_pod_labels(context, include_try_number=False),
}
label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
labels_value = ",".join(label_strings)
if exclude_checked:
Expand Down
12 changes: 11 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ def test_labels_mapped(self):
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
}

def test_find_custom_pod_labels(self):
k = KubernetesPodOperator(
labels={"foo": "bar", "hello": "airflow"},
name="test",
task_id="task",
)
context = create_context(k)
label_selector = k._build_find_pod_label_selector(context)
assert "foo=bar" in label_selector and "hello=airflow" in label_selector

@patch(HOOK_CLASS, new=MagicMock)
def test_find_pod_labels(self):
k = KubernetesPodOperator(
Expand All @@ -327,7 +337,7 @@ def test_find_pod_labels(self):
self.run_pod(k)
_, kwargs = k.client.list_namespaced_pod.call_args
assert kwargs["label_selector"] == (
"dag_id=dag,kubernetes_pod_operator=True,run_id=test,task_id=task,"
"dag_id=dag,foo=bar,kubernetes_pod_operator=True,run_id=test,task_id=task,"
"already_checked!=True,!airflow-worker"
)

Expand Down

0 comments on commit 164526d

Please sign in to comment.