From f99bb20cde258bfd8404448951f5454dc6279e98 Mon Sep 17 00:00:00 2001 From: Oleg Iarygin Date: Fri, 7 Oct 2022 18:14:28 +0400 Subject: [PATCH] gh-97983: Revert "Lay the foundation for further work in asyncio.test_streams: port server cases to IsolatedAsyncioTestCase" (#98015) This PR reverts gh-93369 and gh-97896 because they've made asyncio tests unstable. After these PRs were merged, random GitHub action jobs of random commits started to fail unrelated tests and test framework methods. The reverting is necessary because such shrapnel failures are a symptom of some underlying bug that must be found and fixed first. I had a hope that it's a server overload because we already have extremely rare disc access errors. However, one and a half day passed, and the failures continue to emerge both in PRs and commits. Affected issue: gh-93357. First reported in https://github.com/python/cpython/pull/97940#issuecomment-1270004134. * Revert "gh-93357: Port test cases to IsolatedAsyncioTestCase, part 2 (#97896)" This reverts commit 09aea94d291fed2f3e96558dcd6db04014c3e2fb. * Revert "gh-93357: Start porting asyncio server test cases to IsolatedAsyncioTestCase (#93369)" This reverts commit ce8fc186ac81bce1727bf4192205148daabf5c2e. --- Lib/test/test_asyncio/test_streams.py | 326 ++++++++++++++++---------- 1 file changed, 197 insertions(+), 129 deletions(-) diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 61d5e984dfbfbb..0c49099bc499a5 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -566,10 +566,46 @@ def test_exception_cancel(self): test_utils.run_briefly(self.loop) self.assertIs(stream._waiter, None) - -class NewStreamTests(unittest.IsolatedAsyncioTestCase): - - async def test_start_server(self): + def test_start_server(self): + + class MyServer: + + def __init__(self, loop): + self.server = None + self.loop = loop + + async def handle_client(self, client_reader, client_writer): + data = await client_reader.readline() + client_writer.write(data) + await client_writer.drain() + client_writer.close() + await client_writer.wait_closed() + + def start(self): + sock = socket.create_server(('127.0.0.1', 0)) + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, + sock=sock)) + return sock.getsockname() + + def handle_client_callback(self, client_reader, client_writer): + self.loop.create_task(self.handle_client(client_reader, + client_writer)) + + def start_callback(self): + sock = socket.create_server(('127.0.0.1', 0)) + addr = sock.getsockname() + sock.close() + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client_callback, + host=addr[0], port=addr[1])) + return addr + + def stop(self): + if self.server is not None: + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + self.server = None async def client(addr): reader, writer = await asyncio.open_connection(*addr) @@ -581,43 +617,61 @@ async def client(addr): await writer.wait_closed() return msgback - async def handle_client(client_reader, client_writer): - data = await client_reader.readline() - client_writer.write(data) - await client_writer.drain() - client_writer.close() - await client_writer.wait_closed() - - with self.subTest(msg="coroutine"): - server = await asyncio.start_server( - handle_client, - host=socket_helper.HOSTv4 - ) - addr = server.sockets[0].getsockname() - msg = await client(addr) - server.close() - await server.wait_closed() - self.assertEqual(msg, b"hello world!\n") + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - with self.subTest(msg="callback"): - async def handle_client_callback(client_reader, client_writer): - asyncio.get_running_loop().create_task( - handle_client(client_reader, client_writer) - ) + # test the server variant with a coroutine as client handler + server = MyServer(self.loop) + addr = server.start() + msg = self.loop.run_until_complete(self.loop.create_task(client(addr))) + server.stop() + self.assertEqual(msg, b"hello world!\n") - server = await asyncio.start_server( - handle_client_callback, - host=socket_helper.HOSTv4 - ) - addr = server.sockets[0].getsockname() - reader, writer = await asyncio.open_connection(*addr) - msg = await client(addr) - server.close() - await server.wait_closed() - self.assertEqual(msg, b"hello world!\n") + # test the server variant with a callback as client handler + server = MyServer(self.loop) + addr = server.start_callback() + msg = self.loop.run_until_complete(self.loop.create_task(client(addr))) + server.stop() + self.assertEqual(msg, b"hello world!\n") + + self.assertEqual(messages, []) @socket_helper.skip_unless_bind_unix_socket - async def test_start_unix_server(self): + def test_start_unix_server(self): + + class MyServer: + + def __init__(self, loop, path): + self.server = None + self.loop = loop + self.path = path + + async def handle_client(self, client_reader, client_writer): + data = await client_reader.readline() + client_writer.write(data) + await client_writer.drain() + client_writer.close() + await client_writer.wait_closed() + + def start(self): + self.server = self.loop.run_until_complete( + asyncio.start_unix_server(self.handle_client, + path=self.path)) + + def handle_client_callback(self, client_reader, client_writer): + self.loop.create_task(self.handle_client(client_reader, + client_writer)) + + def start_callback(self): + start = asyncio.start_unix_server(self.handle_client_callback, + path=self.path) + self.server = self.loop.run_until_complete(start) + + def stop(self): + if self.server is not None: + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + self.server = None async def client(path): reader, writer = await asyncio.open_unix_connection(path) @@ -629,42 +683,64 @@ async def client(path): await writer.wait_closed() return msgback - async def handle_client(client_reader, client_writer): - data = await client_reader.readline() - client_writer.write(data) - await client_writer.drain() - client_writer.close() - await client_writer.wait_closed() - - with self.subTest(msg="coroutine"): - with test_utils.unix_socket_path() as path: - server = await asyncio.start_unix_server( - handle_client, - path=path - ) - msg = await client(path) - server.close() - await server.wait_closed() - self.assertEqual(msg, b"hello world!\n") - - with self.subTest(msg="callback"): - async def handle_client_callback(client_reader, client_writer): - asyncio.get_running_loop().create_task( - handle_client(client_reader, client_writer) - ) - - with test_utils.unix_socket_path() as path: - server = await asyncio.start_unix_server( - handle_client_callback, - path=path - ) - msg = await client(path) - server.close() - await server.wait_closed() - self.assertEqual(msg, b"hello world!\n") + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + + # test the server variant with a coroutine as client handler + with test_utils.unix_socket_path() as path: + server = MyServer(self.loop, path) + server.start() + msg = self.loop.run_until_complete( + self.loop.create_task(client(path))) + server.stop() + self.assertEqual(msg, b"hello world!\n") + + # test the server variant with a callback as client handler + with test_utils.unix_socket_path() as path: + server = MyServer(self.loop, path) + server.start_callback() + msg = self.loop.run_until_complete( + self.loop.create_task(client(path))) + server.stop() + self.assertEqual(msg, b"hello world!\n") + + self.assertEqual(messages, []) @unittest.skipIf(ssl is None, 'No ssl module') - async def test_start_tls(self): + def test_start_tls(self): + + class MyServer: + + def __init__(self, loop): + self.server = None + self.loop = loop + + async def handle_client(self, client_reader, client_writer): + data1 = await client_reader.readline() + client_writer.write(data1) + await client_writer.drain() + assert client_writer.get_extra_info('sslcontext') is None + await client_writer.start_tls( + test_utils.simple_server_sslcontext()) + assert client_writer.get_extra_info('sslcontext') is not None + data2 = await client_reader.readline() + client_writer.write(data2) + await client_writer.drain() + client_writer.close() + await client_writer.wait_closed() + + def start(self): + sock = socket.create_server(('127.0.0.1', 0)) + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, + sock=sock)) + return sock.getsockname() + + def stop(self): + if self.server is not None: + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + self.server = None async def client(addr): reader, writer = await asyncio.open_connection(*addr) @@ -681,48 +757,17 @@ async def client(addr): await writer.wait_closed() return msgback1, msgback2 - async def handle_client(client_reader, client_writer): - data1 = await client_reader.readline() - client_writer.write(data1) - await client_writer.drain() - assert client_writer.get_extra_info('sslcontext') is None - await client_writer.start_tls( - test_utils.simple_server_sslcontext()) - assert client_writer.get_extra_info('sslcontext') is not None - - data2 = await client_reader.readline() - client_writer.write(data2) - await client_writer.drain() - client_writer.close() - await client_writer.wait_closed() - - server = await asyncio.start_server( - handle_client, - host=socket_helper.HOSTv4 - ) - addr = server.sockets[0].getsockname() - - msg1, msg2 = await client(addr) - server.close() - await server.wait_closed() - self.assertEqual(msg1, b"hello world 1!\n") - self.assertEqual(msg2, b"hello world 2!\n") - - -class StreamTests2(test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - def tearDown(self): - # just in case if we have transport close callbacks - test_utils.run_briefly(self.loop) + server = MyServer(self.loop) + addr = server.start() + msg1, msg2 = self.loop.run_until_complete(client(addr)) + server.stop() - self.loop.close() - gc.collect() - super().tearDown() + self.assertEqual(messages, []) + self.assertEqual(msg1, b"hello world 1!\n") + self.assertEqual(msg2, b"hello world 2!\n") @unittest.skipIf(sys.platform == 'win32', "Don't have pipes") def test_read_all_from_pipe_reader(self): @@ -941,32 +986,36 @@ def test_LimitOverrunError_pickleable(self): self.assertEqual(str(e), str(e2)) self.assertEqual(e.consumed, e2.consumed) -class NewStreamTests2(unittest.IsolatedAsyncioTestCase): - async def test_wait_closed_on_close(self): + def test_wait_closed_on_close(self): with test_utils.run_test_server() as httpd: - rd, wr = await asyncio.open_connection(*httpd.address) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address)) wr.write(b'GET / HTTP/1.0\r\n\r\n') - data = await rd.readline() + f = rd.readline() + data = self.loop.run_until_complete(f) self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await rd.read() + f = rd.read() + data = self.loop.run_until_complete(f) self.assertTrue(data.endswith(b'\r\n\r\nTest message')) self.assertFalse(wr.is_closing()) wr.close() self.assertTrue(wr.is_closing()) - await wr.wait_closed() + self.loop.run_until_complete(wr.wait_closed()) - async def test_wait_closed_on_close_with_unread_data(self): + def test_wait_closed_on_close_with_unread_data(self): with test_utils.run_test_server() as httpd: - rd, wr = await asyncio.open_connection(*httpd.address) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address)) wr.write(b'GET / HTTP/1.0\r\n\r\n') - data = await rd.readline() + f = rd.readline() + data = self.loop.run_until_complete(f) self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') wr.close() - await wr.wait_closed() + self.loop.run_until_complete(wr.wait_closed()) - async def test_async_writer_api(self): + def test_async_writer_api(self): async def inner(httpd): rd, wr = await asyncio.open_connection(*httpd.address) @@ -978,10 +1027,15 @@ async def inner(httpd): wr.close() await wr.wait_closed() + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + with test_utils.run_test_server() as httpd: - await inner(httpd) + self.loop.run_until_complete(inner(httpd)) - async def test_async_writer_api_exception_after_close(self): + self.assertEqual(messages, []) + + def test_async_writer_api_exception_after_close(self): async def inner(httpd): rd, wr = await asyncio.open_connection(*httpd.address) @@ -995,19 +1049,33 @@ async def inner(httpd): wr.write(b'data') await wr.drain() + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + with test_utils.run_test_server() as httpd: - await inner(httpd) + self.loop.run_until_complete(inner(httpd)) - async def test_eof_feed_when_closing_writer(self): + self.assertEqual(messages, []) + + def test_eof_feed_when_closing_writer(self): # See http://bugs.python.org/issue35065 + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + with test_utils.run_test_server() as httpd: - rd, wr = await asyncio.open_connection(*httpd.address) + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address)) + wr.close() - await wr.wait_closed() + f = wr.wait_closed() + self.loop.run_until_complete(f) self.assertTrue(rd.at_eof()) - data = await rd.read() + f = rd.read() + data = self.loop.run_until_complete(f) self.assertEqual(data, b'') + self.assertEqual(messages, []) + if __name__ == '__main__': unittest.main()