From de375be72141d3f41546f04c1b31b407047e9049 Mon Sep 17 00:00:00 2001 From: Joan Martinez Date: Tue, 5 Nov 2024 17:12:29 +0100 Subject: [PATCH] test: simplify test a little --- tests/integration/docarray_v2/test_v2.py | 4 +- .../network_failures/test_network_failures.py | 165 +++++++++--------- 2 files changed, 86 insertions(+), 83 deletions(-) diff --git a/tests/integration/docarray_v2/test_v2.py b/tests/integration/docarray_v2/test_v2.py index f03fa4ddb9caf..9a2b89add3c39 100644 --- a/tests/integration/docarray_v2/test_v2.py +++ b/tests/integration/docarray_v2/test_v2.py @@ -1399,10 +1399,10 @@ def search( @pytest.mark.parametrize( - 'protocols', [['grpc'], ['http'], ['websocket'], ['grpc', 'http', 'websocket']] + 'protocols', [['grpc'], ['http'], ['websocket']] ) @pytest.mark.parametrize('reduce', [True, False]) -@pytest.mark.parametrize('sleep_time', [0.1, 5]) +@pytest.mark.parametrize('sleep_time', [5]) def test_flow_with_shards_all_shards_return(protocols, reduce, sleep_time): from typing import List diff --git a/tests/integration/network_failures/test_network_failures.py b/tests/integration/network_failures/test_network_failures.py index 288275f917b6c..dca11243786c7 100644 --- a/tests/integration/network_failures/test_network_failures.py +++ b/tests/integration/network_failures/test_network_failures.py @@ -100,6 +100,89 @@ def _test_error(gateway_port, error_ports, protocol): assert str(port) in err_info.value.args[0] + +@pytest.mark.parametrize('protocol', ['grpc', 'http']) +@pytest.mark.parametrize('fail_endpoint_discovery', [True, False]) +@pytest.mark.asyncio +async def test_runtimes_reconnect(port_generator, protocol, fail_endpoint_discovery): + # create gateway and workers manually, then terminate worker process to provoke an error + worker_port = port_generator() + gateway_port = port_generator() + graph_description = '{"start-gateway": ["pod0"], "pod0": ["end-gateway"]}' + pod_addresses = f'{{"pod0": ["0.0.0.0:{worker_port}"]}}' + + gateway_process = _create_gateway( + gateway_port, graph_description, pod_addresses, protocol + ) + + BaseServer.wait_for_ready_or_shutdown( + timeout=5.0, + ctrl_address=f'0.0.0.0:{gateway_port}', + ready_or_shutdown_event=multiprocessing.Event(), + ) + + try: + if fail_endpoint_discovery: + # send request while Executor is not UP, WILL FAIL + p = multiprocessing.Process( + target=_send_request, args=(gateway_port, protocol) + ) + p.start() + p.join() + assert p.exitcode != 0 # The request will fail and raise + + worker_process = _create_worker(worker_port) + assert BaseServer.wait_for_ready_or_shutdown( + timeout=5.0, + ctrl_address=f'0.0.0.0:{worker_port}', + ready_or_shutdown_event=multiprocessing.Event(), + ) + + p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) + p.start() + p.join() + assert p.exitcode == 0 # The request will not fail and raise + worker_process.terminate() # kill worker + worker_process.join() + assert not worker_process.is_alive() + + # send request while Executor is not UP, WILL FAIL + p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) + p.start() + p.join() + assert p.exitcode != 0 + + worker_process = _create_worker(worker_port) + + assert BaseServer.wait_for_ready_or_shutdown( + timeout=5.0, + ctrl_address=f'0.0.0.0:{worker_port}', + ready_or_shutdown_event=multiprocessing.Event(), + ) + p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) + p.start() + p.join() + assert ( + p.exitcode == 0 + ) # if exitcode != 0 then test in other process did not pass and this should fail + # ----------- 2. test that gateways remain alive ----------- + # just do the same again, expecting the same failure + worker_process.terminate() # kill worker + worker_process.join() + assert not worker_process.is_alive() + assert ( + worker_process.exitcode == 0 + ) # if exitcode != 0 then test in other process did not pass and this should fail + + except Exception: + assert False + finally: # clean up runtimes + gateway_process.terminate() + gateway_process.join() + worker_process.terminate() + worker_process.join() + + @pytest.mark.parametrize( 'fail_before_endpoint_discovery', [True, False] ) # if not before, then after @@ -247,89 +330,9 @@ async def patch_process_data(self, requests_, context, **kwargs): worker_process.join() -@pytest.mark.parametrize('protocol', ['grpc', 'http', 'grpc']) -@pytest.mark.parametrize('fail_endpoint_discovery', [True, False]) -@pytest.mark.asyncio -async def test_runtimes_reconnect(port_generator, protocol, fail_endpoint_discovery): - # create gateway and workers manually, then terminate worker process to provoke an error - worker_port = port_generator() - gateway_port = port_generator() - graph_description = '{"start-gateway": ["pod0"], "pod0": ["end-gateway"]}' - pod_addresses = f'{{"pod0": ["0.0.0.0:{worker_port}"]}}' - - gateway_process = _create_gateway( - gateway_port, graph_description, pod_addresses, protocol - ) - - BaseServer.wait_for_ready_or_shutdown( - timeout=5.0, - ctrl_address=f'0.0.0.0:{gateway_port}', - ready_or_shutdown_event=multiprocessing.Event(), - ) - - try: - if fail_endpoint_discovery: - # send request while Executor is not UP, WILL FAIL - p = multiprocessing.Process( - target=_send_request, args=(gateway_port, protocol) - ) - p.start() - p.join() - assert p.exitcode != 0 # The request will fail and raise - - worker_process = _create_worker(worker_port) - assert BaseServer.wait_for_ready_or_shutdown( - timeout=5.0, - ctrl_address=f'0.0.0.0:{worker_port}', - ready_or_shutdown_event=multiprocessing.Event(), - ) - - p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) - p.start() - p.join() - assert p.exitcode == 0 # The request will not fail and raise - worker_process.terminate() # kill worker - worker_process.join() - assert not worker_process.is_alive() - - # send request while Executor is not UP, WILL FAIL - p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) - p.start() - p.join() - assert p.exitcode != 0 - - worker_process = _create_worker(worker_port) - - assert BaseServer.wait_for_ready_or_shutdown( - timeout=5.0, - ctrl_address=f'0.0.0.0:{worker_port}', - ready_or_shutdown_event=multiprocessing.Event(), - ) - p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol)) - p.start() - p.join() - assert ( - p.exitcode == 0 - ) # if exitcode != 0 then test in other process did not pass and this should fail - # ----------- 2. test that gateways remain alive ----------- - # just do the same again, expecting the same failure - worker_process.terminate() # kill worker - worker_process.join() - assert not worker_process.is_alive() - assert ( - worker_process.exitcode == 0 - ) # if exitcode != 0 then test in other process did not pass and this should fail - - except Exception: - assert False - finally: # clean up runtimes - gateway_process.terminate() - gateway_process.join() - worker_process.terminate() - worker_process.join() -@pytest.mark.parametrize('protocol', ['grpc', 'http', 'grpc']) +@pytest.mark.parametrize('protocol', ['grpc', 'http']) @pytest.mark.parametrize('fail_endpoint_discovery', [True, False]) @pytest.mark.asyncio async def test_runtimes_reconnect_replicas(