From 356328e223f9711078e6b31cd281713a57c1b55a Mon Sep 17 00:00:00 2001 From: Arkadiusz Bokowy Date: Mon, 2 Sep 2024 14:58:53 +0200 Subject: [PATCH] Fix asyncio forwarder --- .../fabric-admin/scripts/fabric-sync-app.py | 46 +++++++++++++------ src/python_testing/execute_python_tests.py | 2 + 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/examples/fabric-admin/scripts/fabric-sync-app.py b/examples/fabric-admin/scripts/fabric-sync-app.py index 8005346c08c8f9..0f6385fa5aec53 100755 --- a/examples/fabric-admin/scripts/fabric-sync-app.py +++ b/examples/fabric-admin/scripts/fabric-sync-app.py @@ -19,13 +19,30 @@ import os import signal import sys -import typing from argparse import ArgumentParser from tempfile import TemporaryDirectory -async def forward_f(f_in: asyncio.StreamReader, f_out: typing.BinaryIO, - prefix: bytes, cb=None): +async def asyncio_stdin() -> asyncio.StreamReader: + """Wrap sys.stdin in an asyncio StreamReader.""" + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + return reader + + +async def asyncio_stdout(file=sys.stdout) -> asyncio.StreamWriter: + """Wrap an IO stream in an asyncio StreamWriter.""" + loop = asyncio.get_event_loop() + transport, protocol = await loop.connect_write_pipe( + lambda: asyncio.streams.FlowControlMixin(loop=loop), + os.fdopen(file.fileno(), 'wb')) + return asyncio.streams.StreamWriter(transport, protocol, None, loop) + + +async def forward_f(prefix: bytes, f_in: asyncio.StreamReader, + f_out: asyncio.StreamWriter, cb=None): """Forward f_in to f_out with a prefix attached. This function can optionally feed received lines to a callback function. @@ -36,9 +53,9 @@ async def forward_f(f_in: asyncio.StreamReader, f_out: typing.BinaryIO, break if cb is not None: cb(line) - f_out.buffer.write(prefix) - f_out.buffer.write(line) - f_out.flush() + f_out.write(prefix) + f_out.write(line) + await f_out.drain() async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter): @@ -62,10 +79,7 @@ async def forward_pipe(pipe_path: str, f_out: asyncio.StreamWriter): async def forward_stdin(f_out: asyncio.StreamWriter): """Forward stdin to f_out.""" - loop = asyncio.get_event_loop() - reader = asyncio.StreamReader() - protocol = asyncio.StreamReaderProtocol(reader) - await loop.connect_read_pipe(lambda: protocol, sys.stdin) + reader = await asyncio_stdin() while True: line = await reader.readline() if not line: @@ -94,9 +108,15 @@ async def run(self): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) # Add the stdout and stderr processing to the event loop. - asyncio.create_task(forward_f(self.p.stderr, sys.stderr, self.tag)) - asyncio.create_task(forward_f(self.p.stdout, sys.stdout, self.tag, - cb=self._check_output)) + asyncio.create_task(forward_f( + self.tag, + self.p.stderr, + await asyncio_stdout(sys.stderr))) + asyncio.create_task(forward_f( + self.tag, + self.p.stdout, + await asyncio_stdout(sys.stdout), + cb=self._check_output)) async def send(self, message: str, expected_output: str = None, timeout: float = None): """Send a message to a process and optionally wait for a response.""" diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py index d508692a842846..1b56432b447b5b 100644 --- a/src/python_testing/execute_python_tests.py +++ b/src/python_testing/execute_python_tests.py @@ -74,6 +74,8 @@ def main(search_directory, env_file): "TC_TMP_2_1.py", "TC_MCORE_FS_1_1.py", "TC_MCORE_FS_1_2.py", + "TC_MCORE_FS_1_3.py", + "TC_MCORE_FS_1_4.py", "TC_MCORE_FS_1_5.py", "TC_OCC_3_1.py", "TC_OCC_3_2.py",