Skip to content

Commit

Permalink
Allow DockerOperator.skip_on_exit_code to be zero (#36360)
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus authored Dec 21, 2023
1 parent b2f1882 commit d039437
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def __init__(
skip_on_exit_code
if isinstance(skip_on_exit_code, Container)
else [skip_on_exit_code]
if skip_on_exit_code
if skip_on_exit_code is not None
else []
)
self.port_bindings = port_bindings or {}
Expand Down
23 changes: 18 additions & 5 deletions tests/providers/docker/decorators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,29 @@ def do_run():
assert dag.task_ids[-1] == "do_run__20"

@pytest.mark.parametrize(
"extra_kwargs, actual_exit_code, expected_state",
"kwargs, actual_exit_code, expected_state",
[
(None, 99, TaskInstanceState.FAILED),
({}, 0, TaskInstanceState.SUCCESS),
({}, 100, TaskInstanceState.FAILED),
({}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": None}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": None}, 100, TaskInstanceState.FAILED),
({"skip_on_exit_code": None}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": 100}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": 100}, 100, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": 100}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": None}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": 0}, 0, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": [100]}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": [100]}, 100, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": [100]}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": [100, 102]}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": (100,)}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": (100,)}, 100, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": (100,)}, 101, TaskInstanceState.FAILED),
],
)
def test_skip_docker_operator(self, extra_kwargs, actual_exit_code, expected_state, dag_maker):
@task.docker(image="python:3.9-slim", auto_remove="force", **(extra_kwargs or {}))
def test_skip_docker_operator(self, kwargs, actual_exit_code, expected_state, dag_maker):
@task.docker(image="python:3.9-slim", auto_remove="force", **kwargs)
def f(exit_code):
raise SystemExit(exit_code)

Expand Down
27 changes: 16 additions & 11 deletions tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,27 +527,32 @@ def test_execute_unicode_logs(self):
print_exception_mock.assert_not_called()

@pytest.mark.parametrize(
"extra_kwargs, actual_exit_code, expected_exc",
"kwargs, actual_exit_code, expected_exc",
[
(None, 99, AirflowException),
({}, 0, None),
({}, 100, AirflowException),
({}, 101, AirflowException),
({"skip_on_exit_code": None}, 0, None),
({"skip_on_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": None}, 101, AirflowException),
({"skip_on_exit_code": 100}, 0, None),
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": 0}, 0, AirflowSkipException),
({"skip_on_exit_code": [100]}, 0, None),
({"skip_on_exit_code": [100]}, 100, AirflowSkipException),
({"skip_on_exit_code": (100, 101)}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": [100]}, 101, AirflowException),
({"skip_on_exit_code": [100, 102]}, 101, AirflowException),
({"skip_on_exit_code": None}, 0, None),
({"skip_on_exit_code": (100,)}, 0, None),
({"skip_on_exit_code": (100,)}, 100, AirflowSkipException),
({"skip_on_exit_code": (100,)}, 101, AirflowException),
],
)
def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
def test_skip(self, kwargs, actual_exit_code, expected_exc):
msg = {"StatusCode": actual_exit_code}
self.client_mock.wait.return_value = msg

kwargs = dict(image="ubuntu", owner="unittest", task_id="unittest")
if extra_kwargs:
kwargs.update(**extra_kwargs)
operator = DockerOperator(**kwargs)
operator = DockerOperator(image="ubuntu", owner="unittest", task_id="unittest", **kwargs)

if expected_exc is None:
operator.execute({})
Expand Down

0 comments on commit d039437

Please sign in to comment.