Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) #23618

Merged
merged 6 commits into from
May 11, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import json
import math
import time
Expand Down Expand Up @@ -193,6 +194,35 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
self.log.info(message)
return timestamp

def consume_container_logs_stream(self, pod: V1Pod, container_name: str, stream: Iterable[bytes]) -> Optional[DateTime]:
async def consume_log_stream() -> Optional[DateTime]:
return self.log_iterable(stream)

async def async_await_container_completion() -> None:
self.await_container_completion(pod=pod, container_name=container_name)

await_container_completion = asyncio.create_task(async_await_container_completion())
log_stream = asyncio.create_task(consume_log_stream())
asyncio.run(asyncio.wait({log_stream, await_container_completion}, return_when=asyncio.FIRST_COMPLETED))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not used asyncio in regular operators yet but I see no reason why we should not start using it, since we are already in Python 3.7+ world - especially for k8s provider that is Airflow 2.3.0+ only

@dstandish @andrewgodwin - WDYT? I know you are likely away, but in case you can take a look, I'd love to merge it for the providers round I want to release now, as it seems to address an annoying problem people might experience.

Let me know you you can take a look but it LGTM.


if log_stream.done():
return log_stream.result()

log_stream.cancel()
self.log.warning(
"Pod %s log read was interrupted at some point caused by log rotation "
Copy link
Contributor Author

@schattian schattian May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do better here, adding a call to fetch_container_logs(follow=False) to consume the logs we lost after the rotation happened.

However, this will add a bit of an (imo) unnecessary overhead considering:

  1. we don't expect that to happen with future versions of k8s that have this api fixed.
  2. if the pod was deleted for some reason, the log produced will be misleading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

"see https://github.com/kubernetes/kubernetes/issues/59902 ",
"and https://github.com/apache/airflow/issues/23497 for reference.",
pod.metadata.name,
container_name,
)

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
Expand Down Expand Up @@ -220,9 +250,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
self.log.info(message)
if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down Expand Up @@ -255,7 +286,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while not self.container_is_running(pod=pod, container_name=container_name):
while self.container_is_running(pod=pod, container_name=container_name):
Copy link
Contributor Author

@schattian schattian May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the change unrelated to the issue.
I think the logic was reversed, blocking until the container is running (instead of blocking til terminated).

Tests are all mocking the function, so this is not detected by them.

Probably that's not a great deal as the default parameter for the operator is get_logs=True thus this is unused (til now, as i'm using it on fetch_container_logs).

I dont know if this is out of the scope of this pr and should be added separately - which guideline do you follow for these cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look suspiciously wrong indeed. Since this is used in your fix, I see no problem with having it as part of the PR.

time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
Expand Down