diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 0a10d02d0dc43..153abee1dbf95 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -15,8 +15,6 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" -import asyncio -import concurrent import json import math import time @@ -195,40 +193,6 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt ) return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) - def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]: - timestamp = None - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace")) - self.log.info(message) - return timestamp - - def consume_container_logs_stream( - self, pod: V1Pod, container_name: str, stream: Iterable[bytes] - ) -> Optional[DateTime]: - async def async_await_container_completion() -> None: - await asyncio.sleep(1) - while self.container_is_running(pod=pod, container_name=container_name): - await asyncio.sleep(1) - - loop = asyncio.get_event_loop() - await_container_completion = loop.create_task(async_await_container_completion()) - log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream)) - tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream} - loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) - if log_stream.done(): - return log_stream.result() - - log_stream.cancel() - try: - loop.run_until_complete(log_stream) - except concurrent.futures.CancelledError: - self.log.warning( - "Container %s log read was interrupted at some point caused by log rotation " - "see https://github.com/apache/airflow/issues/23497 for reference.", - container_name, - ) - return None - def fetch_container_logs( self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None ) -> PodLoggingStatus: @@ -256,11 +220,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) ), follow=follow, ) - if follow: - timestamp = self.consume_container_logs_stream(pod, container_name, logs) - else: - timestamp = self.log_iterable(logs) - + for raw_line in logs: + line = raw_line.decode('utf-8', errors="backslashreplace") + timestamp, message = self.parse_log_line(line) + self.log.info(message) except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " @@ -293,7 +256,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True) time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: - while self.container_is_running(pod=pod, container_name=container_name): + while not self.container_is_running(pod=pod, container_name=container_name): time.sleep(1) def await_pod_completion(self, pod: V1Pod) -> V1Pod: diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 7d105a1ac3a69..8070c3c3532b5 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -15,8 +15,6 @@ # specific language governing permissions and limitations # under the License. import logging -import time -from typing import Generator from unittest import mock from unittest.mock import MagicMock @@ -314,7 +312,7 @@ def test_fetch_container_since_time(self, container_running, mock_now): args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0] assert kwargs['since_seconds'] == 5 - @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)]) + @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)]) @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') def test_fetch_container_running_follow( self, container_running_mock, follow, is_running_calls, exp_running @@ -324,35 +322,13 @@ def test_fetch_container_running_follow( When called with follow=False, should return immediately even though still running. """ mock_pod = MagicMock() - container_running_mock.side_effect = [True, False, False, False] # called once when follow=False + container_running_mock.side_effect = [True, True, False] # only will be called once self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi'] ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) assert len(container_running_mock.call_args_list) == is_running_calls assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC')) assert ret.running is exp_running - @pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)]) - @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') - def test_fetch_container_running_follow_when_kube_api_hangs( - self, container_running_mock, follow, is_running_calls, exp_running - ): - """ - When called with follow, should keep looping even after disconnections, if pod still running. - """ - mock_pod = MagicMock() - container_running_mock.side_effect = [True, False, False] - - def stream_logs() -> Generator: - while True: - time.sleep(1) # this is intentional: urllib3.response.stream() is not async - yield b'2021-01-01 hi' - - self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs() - ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow) - assert len(container_running_mock.call_args_list) == is_running_calls - assert ret.running is exp_running - assert ret.last_log_time is None - def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects