Skip to content

Commit

Permalink
Add random_name_suffix to SparkKubernetesOperator (#43800) (#43847)
Browse files Browse the repository at this point in the history
* Add random_name_suffix to SparkKubernetesOperator (#43800)

Prior to this change, `random_name_suffix` was only documented but not implemented as a configurable option. Passing this value as an argument had no effect. This commit introduces a `false` option for `random_name_suffix`, which prevents the generation of a random suffix for the pod name. For compatibility, the default value is set to `true`, ensuring the pod name will still conform to `MAX_LABEL_LEN = 63`.

Fixes: #43800

* Add random_name_suffix to SparkKubernetesOperator (#43800)

Prior to this change, `random_name_suffix` was only documented but not implemented as a configurable option. Passing this value as an argument had no effect. This commit introduces a `false` option for `random_name_suffix`, which prevents the generation of a random suffix for the pod name. For compatibility, the default value is set to `true`, ensuring the pod name will still conform to `MAX_LABEL_LEN = 63`.

Fixes: #43800
  • Loading branch information
mrk-andreev authored Nov 9, 2024
1 parent 229750d commit 63b2bbd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
:param kubernetes_conn_id: the connection to Kubernetes cluster
:param random_name_suffix: If True, adds a random suffix to the pod name
"""

template_fields = ["application_file", "namespace", "template_spec", "kubernetes_conn_id"]
Expand All @@ -94,6 +95,7 @@ def __init__(
reattach_on_restart: bool = True,
delete_on_termination: bool = True,
kubernetes_conn_id: str = "kubernetes_default",
random_name_suffix: bool = True,
**kwargs,
) -> None:
if kwargs.get("xcom_push") is not None:
Expand All @@ -112,6 +114,7 @@ def __init__(
self.get_logs = get_logs
self.log_events_on_failure = log_events_on_failure
self.success_run_history_limit = success_run_history_limit
self.random_name_suffix = random_name_suffix

if self.base_container_name != self.BASE_CONTAINER_NAME:
self.log.warning(
Expand Down Expand Up @@ -164,7 +167,11 @@ def create_job_name(self):
self.name or self.template_body.get("spark", {}).get("metadata", {}).get("name") or self.task_id
)

updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
if self.random_name_suffix:
updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
else:
# truncation is required to maintain the same behavior as before
updated_name = name[:MAX_LABEL_LEN]

return self._set_name(updated_name)

Expand Down
26 changes: 26 additions & 0 deletions providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from datetime import date
from unittest import mock
from unittest.mock import patch
from uuid import uuid4

import pendulum
import pytest
Expand All @@ -31,6 +32,7 @@
from airflow import DAG
from airflow.models import Connection, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN
from airflow.utils import db, timezone
from airflow.utils.types import DagRunType

Expand Down Expand Up @@ -828,3 +830,27 @@ def test_resolve_application_file_real_file_not_exists(create_task_instance_of_o
task: SparkKubernetesOperator = ti.task
with pytest.raises(TypeError, match="application_file body can't transformed into the dictionary"):
_ = task.template_body


@pytest.mark.parametrize(
"random_name_suffix",
[pytest.param(True, id="use-random_name_suffix"), pytest.param(False, id="skip-random_name_suffix")],
)
def test_create_job_name(random_name_suffix: bool):
name = f"x{uuid4()}"
op = SparkKubernetesOperator(task_id="task_id", name=name, random_name_suffix=random_name_suffix)
pod_name = op.create_job_name()

if random_name_suffix:
assert pod_name.startswith(name)
assert pod_name != name
else:
assert pod_name == name


def test_create_job_name_should_truncate_long_names():
long_name = f"{uuid4()}" + "x" * MAX_LABEL_LEN
op = SparkKubernetesOperator(task_id="task_id", name=long_name, random_name_suffix=False)
pod_name = op.create_job_name()

assert pod_name == long_name[:MAX_LABEL_LEN]

0 comments on commit 63b2bbd

Please sign in to comment.