Skip to content

Commit

Permalink
Fix asyncio loop leaking
Browse files Browse the repository at this point in the history
  • Loading branch information
schattian committed May 11, 2022
1 parent 86171ff commit 67474cc
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
12 changes: 6 additions & 6 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,24 +210,24 @@ async def async_await_container_completion() -> None:
while self.container_is_running(pod=pod, container_name=container_name):
await asyncio.sleep(1)

loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
await_container_completion = loop.create_task(async_await_container_completion())
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
if log_stream.done():
return log_stream.result()

log_stream.cancel()
try:
loop.run_until_complete(log_stream)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
except concurrent.futures.CancelledError:
self.log.warning(
"Container %s log read was interrupted at some point caused by log rotation "
"see https://github.com/apache/airflow/issues/23497 for reference.",
container_name,
)
return None
return None
else:
return log_stream.result()

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs['since_seconds'] == 5

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
Expand All @@ -324,7 +324,7 @@ def test_fetch_container_running_follow(
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
container_running_mock.side_effect = [True, False, False] # called once when follow=False
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
Expand Down

0 comments on commit 67474cc

Please sign in to comment.