From 7d12b9223506ea5668359a1d55c12e1d0eb613b8 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:47:00 -0700 Subject: [PATCH] Revert back to the logic before #32561 --- airflow/utils/log/file_task_handler.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 4b3e9bfff27b..0693efc24e91 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -381,28 +381,23 @@ def _read( executor_messages: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] - is_in_running_or_deferred = ti.state in ( - TaskInstanceState.RUNNING, - TaskInstanceState.DEFERRED, - ) - is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY with suppress(NotImplementedError): remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) messages_list.extend(remote_messages) + has_k8s_exec_pod = False if ti.state == TaskInstanceState.RUNNING: response = self._executor_get_task_log(ti, try_number) if response: executor_messages, executor_logs = response if executor_messages: messages_list.extend(executor_messages) + has_k8s_exec_pod = True if not (remote_logs and ti.state not in State.unfinished): # when finished, if we have remote logs, no need to check local worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) - if is_in_running_or_deferred or is_up_for_retry or not (executor_messages or remote_logs): - # While task instance is still running or deferred, look for served logs. - # And even if it's in any state, if there are no logs found yet, check served logs. + if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) messages_list.extend(served_messages) elif ti.state not in State.unfinished and not (local_logs or remote_logs): @@ -422,7 +417,7 @@ def _read( ) log_pos = len(logs) messages = "".join([f"*** {x}\n" for x in messages_list]) - end_of_log = ti.try_number != try_number or not is_in_running_or_deferred + end_of_log = ti.try_number != try_number or ti.state not in (State.RUNNING, State.DEFERRED) if metadata and "log_pos" in metadata: previous_chars = metadata["log_pos"] logs = logs[previous_chars:] # Cut off previously passed log test as new tail