Skip to content

Commit

Permalink
Fix timestamp parse failure for k8s executor pod tailing (#31175)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored May 11, 2023
1 parent 8a5fe6a commit 86d62d3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
9 changes: 5 additions & 4 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.logging_mixin import LoggingMixin, remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State, TaskInstanceState

Expand Down Expand Up @@ -794,7 +794,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li

client = get_kube_client()

messages.append(f"Trying to get logs (last 100 lines) from worker pod {ti.hostname}")
messages.append(f"Attempting to fetch logs from pod {ti.hostname} through kube API")
selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
Expand All @@ -820,9 +820,10 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
tail_lines=100,
_preload_content=False,
)

for line in res:
log.append(line.decode())
log.append(remove_escape_codes(line.decode()))
if log:
messages.append("Found logs through kube API")
except Exception as e:
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
return messages, ["\n".join(log)]
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _read(
if response:
executor_messages, executor_logs = response
if executor_messages:
messages_list.extend(messages_list)
messages_list.extend(executor_messages)
if not (remote_logs and ti.state not in State.unfinished):
# when finished, if we have remote logs, no need to check local
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
Expand Down
7 changes: 5 additions & 2 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,10 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat
messages, logs = executor.get_task_log(ti=ti, try_number=1)

mock_kube_client.read_namespaced_pod_log.assert_called_once()
assert "Trying to get logs (last 100 lines) from worker pod " in messages
assert messages == [
"Attempting to fetch logs from pod through kube API",
"Found logs through kube API",
]
assert logs[0] == "a_\nb_\nc_"

mock_kube_client.reset_mock()
Expand All @@ -1149,7 +1152,7 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat
messages, logs = executor.get_task_log(ti=ti, try_number=1)
assert logs == [""]
assert messages == [
"Trying to get logs (last 100 lines) from worker pod ",
"Attempting to fetch logs from pod through kube API",
"Reading from k8s pod logs failed: error_fetching_pod_log",
]

Expand Down

0 comments on commit 86d62d3

Please sign in to comment.