Skip to content

Commit

Permalink
Fix k8s pod.execute randomly stuck indefinitely by logs consumption (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
schattian authored May 11, 2022
1 parent e16eca2 commit ee342b8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
47 changes: 42 additions & 5 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import concurrent
import json
import math
import time
Expand Down Expand Up @@ -193,6 +195,40 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
timestamp = None
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace"))
self.log.info(message)
return timestamp

def consume_container_logs_stream(
self, pod: V1Pod, container_name: str, stream: Iterable[bytes]
) -> Optional[DateTime]:
async def async_await_container_completion() -> None:
await asyncio.sleep(1)
while self.container_is_running(pod=pod, container_name=container_name):
await asyncio.sleep(1)

loop = asyncio.get_event_loop()
await_container_completion = loop.create_task(async_await_container_completion())
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
if log_stream.done():
return log_stream.result()

log_stream.cancel()
try:
loop.run_until_complete(log_stream)
except concurrent.futures.CancelledError:
self.log.warning(
"Container %s log read was interrupted at some point caused by log rotation "
"see https://github.com/apache/airflow/issues/23497 for reference.",
container_name,
)
return None

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
Expand Down Expand Up @@ -220,10 +256,11 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
for raw_line in logs:
line = raw_line.decode('utf-8', errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)

except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down Expand Up @@ -256,7 +293,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while not self.container_is_running(pod=pod, container_name=container_name):
while self.container_is_running(pod=pod, container_name=container_name):
time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
Expand Down
28 changes: 26 additions & 2 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
import logging
import time
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock

Expand Down Expand Up @@ -312,7 +314,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs['since_seconds'] == 5

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
Expand All @@ -322,13 +324,35 @@ def test_fetch_container_running_follow(
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, True, False] # only will be called once
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC'))
assert ret.running is exp_running

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow_when_kube_api_hangs(
self, container_running_mock, follow, is_running_calls, exp_running
):
"""
When called with follow, should keep looping even after disconnections, if pod still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False]

def stream_logs() -> Generator:
while True:
time.sleep(1) # this is intentional: urllib3.response.stream() is not async
yield b'2021-01-01 hi'

self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs()
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.running is exp_running
assert ret.last_log_time is None


def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects
Expand Down

0 comments on commit ee342b8

Please sign in to comment.