diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index aa9a0ad0cdd93..5fe96bb6a3929 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -21,7 +21,7 @@ import shutil from functools import cached_property from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from azure.core.exceptions import HttpResponseError @@ -161,32 +161,6 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l self.log.exception("Could not read blob") return messages, logs - def _read( - self, ti, try_number: int, metadata: dict[str, Any] | None = None - ) -> tuple[str, dict[str, bool]]: - """ - Read logs of given task instance and try_number from Wasb remote storage. - - If failed, read the log from task instance host machine. - - todo: when min airflow version >= 2.6, remove this method - - :param ti: task instance object - :param try_number: task instance try_number to read logs from - :param metadata: log metadata, - can be used for steaming log reading and auto-tailing. - """ - if hasattr(super(), "_read_remote_logs"): - # from Airflow 2.6, we don't implement the `_read` method. - # if parent has _read_remote_logs, we're >= 2.6 - return super()._read(ti, try_number, metadata) - - # below is backcompat, for airflow < 2.6 - messages, logs = self._read_remote_logs(ti, try_number, metadata) - if not logs: - return super()._read(ti, try_number, metadata) - return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} - def wasb_log_exists(self, remote_log_location: str) -> bool: """ Check if remote_log_location exists in remote storage.