From 24811f729f0e20dfff1be9afa8bf4a60b44fe628 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 19 Nov 2024 13:42:33 +0000 Subject: [PATCH] Make task output "unbuffered" so output is captured straight away (#44186) Without this change a dag like this: ``` @task() def hello(): print("hello") time.sleep(300) print("goodbye") ``` would not show the output for "hello" until after the sleep! This is analogouys to setting PYTHONUNBUFFERED environment variable when running something like `python script.py | cat` etc. --- .../airflow/sdk/execution_time/supervisor.py | 5 ++-- .../tests/execution_time/test_supervisor.py | 28 +++++++++---------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py b/task_sdk/src/airflow/sdk/execution_time/supervisor.py index 7faddebb25c53..f2715ad3e5da9 100644 --- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py @@ -149,8 +149,9 @@ def _reopen_std_io_handles(child_stdin, child_stdout, child_stderr): fd = sock.fileno() else: raise - - setattr(sys, handle_name, os.fdopen(fd, mode)) + # We can't open text mode fully unbuffered (python throws an exception if we try), but we can make it line buffered with `buffering=1` + handle = os.fdopen(fd, mode, buffering=1) + setattr(sys, handle_name, handle) def _fork_main( diff --git a/task_sdk/tests/execution_time/test_supervisor.py b/task_sdk/tests/execution_time/test_supervisor.py index 5ed51fece51aa..edd62d1722259 100644 --- a/task_sdk/tests/execution_time/test_supervisor.py +++ b/task_sdk/tests/execution_time/test_supervisor.py @@ -51,30 +51,30 @@ def test_reading_from_pipes(self, captured_logs, time_machine): # Ignore anything lower than INFO for this test. Captured_logs resets things for us afterwards structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)) - line = lineno() - def subprocess_main(): # This is run in the subprocess! # Ensure we follow the "protocol" and get the startup message before we do anything sys.stdin.readline() - # Flush calls are to ensure ordering of output for predictable tests import logging import warnings print("I'm a short message") sys.stdout.write("Message ") - sys.stdout.write("split across two writes\n") - sys.stdout.flush() - print("stderr message", file=sys.stderr) - sys.stderr.flush() + # We need a short sleep for the main process to process things. I worry this timining will be + # fragile, but I can't think of a better way. This lets the stdout be read (partial line) and the + # stderr full line be read + sleep(0.1) + sys.stdout.write("split across two writes\n") logging.getLogger("airflow.foobar").error("An error message") warnings.warn("Warning should be captured too", stacklevel=1) + line = lineno() - 2 # Line the error should be on + instant = tz.datetime(2024, 11, 7, 12, 34, 56, 78901) time_machine.move_to(instant, tick=False) @@ -103,16 +103,16 @@ def subprocess_main(): "timestamp": "2024-11-07T12:34:56.078901Z", }, { - "chan": "stdout", - "event": "Message split across two writes", - "level": "info", + "chan": "stderr", + "event": "stderr message", + "level": "error", "logger": "task", "timestamp": "2024-11-07T12:34:56.078901Z", }, { - "chan": "stderr", - "event": "stderr message", - "level": "error", + "chan": "stdout", + "event": "Message split across two writes", + "level": "info", "logger": "task", "timestamp": "2024-11-07T12:34:56.078901Z", }, @@ -127,7 +127,7 @@ def subprocess_main(): "event": "Warning should be captured too", "filename": __file__, "level": "warning", - "lineno": line + 22, + "lineno": line, "logger": "py.warnings", "timestamp": instant.replace(tzinfo=None), },