From ee342b85b97649e2e29fcf83f439279b68f1b4d4 Mon Sep 17 00:00:00 2001 From: Sebastian Chamena <43488475+schattian@users.noreply.github.com> Date: Wed, 11 May 2022 12:20:49 -0700 Subject: [PATCH] Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) (#23618) --- .../cncf/kubernetes/utils/pod_manager.py | 47 +++++++++++++++++-- .../cncf/kubernetes/utils/test_pod_manager.py | 28 ++++++++++- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 153abee1dbf95..0a10d02d0dc43 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" +import asyncio +import concurrent import json import math import time @@ -193,6 +195,40 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt ) return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) + def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: + timestamp = None + for line in logs: + timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace")) + self.log.info(message) + return timestamp + + def consume_container_logs_stream( + self, pod: V1Pod, container_name: str, stream: Iterable[bytes] + ) -> Optional[DateTime]: + async def async_await_container_completion() -> None: + await asyncio.sleep(1) + while self.container_is_running(pod=pod, container_name=container_name): + await asyncio.sleep(1) + + loop = asyncio.get_event_loop() + await_container_completion = loop.create_task(async_await_container_completion()) + log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream)) + tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream} + loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) + if log_stream.done(): + return log_stream.result() + + log_stream.cancel() + try: + loop.run_until_complete(log_stream) + except concurrent.futures.CancelledError: + self.log.warning( + "Container %s log read was interrupted at some point caused by log rotation " + "see https://github.com/apache/airflow/issues/23497 for reference.", + container_name, + ) + return None + def fetch_container_logs( self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None ) -> PodLoggingStatus: @@ -220,10 +256,11 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - for raw_line in logs: - line = raw_line.decode('utf-8', errors="backslashreplace") - timestamp, message = self.parse_log_line(line) - self.log.info(message) + if follow: + timestamp = self.consume_container_logs_stream(pod, container_name, logs) + else: + timestamp = self.log_iterable(logs) + except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " @@ -256,7 +293,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: - while not self.container_is_running(pod=pod, container_name=container_name): + while self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) def await_pod_completion(self, pod: V1Pod) -> V1Pod: diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 8070c3c3532b5..7d105a1ac3a69 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. import logging +import time +from typing import Generator from unittest import mock from unittest.mock import MagicMock @@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now): args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0] assert kwargs['since_seconds'] == 5 - @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)]) + @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)]) @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') def test_fetch_container_running_follow( self, container_running_mock, follow, is_running_calls, exp_running @@ -322,13 +324,35 @@ def test_fetch_container_running_follow( When called with follow=False, should return immediately even though still running. """ mock_pod = MagicMock() - container_running_mock.side_effect = [True, True, False] # only will be called once + container_running_mock.side_effect = [True, False, False, False] # called once when follow=False self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi'] ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) assert len(container_running_mock.call_args_list) == is_running_calls assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC')) assert ret.running is exp_running + @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)]) + @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') + def test_fetch_container_running_follow_when_kube_api_hangs( + self, container_running_mock, follow, is_running_calls, exp_running + ): + """ + When called with follow, should keep looping even after disconnections, if pod still running. + """ + mock_pod = MagicMock() + container_running_mock.side_effect = [True, False, False] + + def stream_logs() -> Generator: + while True: + time.sleep(1) # this is intentional: urllib3.response.stream() is not async + yield b'2021-01-01 hi' + + self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs() + ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) + assert len(container_running_mock.call_args_list) == is_running_calls + assert ret.running is exp_running + assert ret.last_log_time is None + def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects