Skip to content

Commit

Permalink
test: simplify test a little
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 5, 2024
1 parent 790c158 commit de375be
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 83 deletions.
4 changes: 2 additions & 2 deletions tests/integration/docarray_v2/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1399,10 +1399,10 @@ def search(


@pytest.mark.parametrize(
'protocols', [['grpc'], ['http'], ['websocket'], ['grpc', 'http', 'websocket']]
'protocols', [['grpc'], ['http'], ['websocket']]
)
@pytest.mark.parametrize('reduce', [True, False])
@pytest.mark.parametrize('sleep_time', [0.1, 5])
@pytest.mark.parametrize('sleep_time', [5])
def test_flow_with_shards_all_shards_return(protocols, reduce, sleep_time):
from typing import List

Expand Down
165 changes: 84 additions & 81 deletions tests/integration/network_failures/test_network_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,89 @@ def _test_error(gateway_port, error_ports, protocol):
assert str(port) in err_info.value.args[0]



@pytest.mark.parametrize('protocol', ['grpc', 'http'])
@pytest.mark.parametrize('fail_endpoint_discovery', [True, False])
@pytest.mark.asyncio
async def test_runtimes_reconnect(port_generator, protocol, fail_endpoint_discovery):
# create gateway and workers manually, then terminate worker process to provoke an error
worker_port = port_generator()
gateway_port = port_generator()
graph_description = '{"start-gateway": ["pod0"], "pod0": ["end-gateway"]}'
pod_addresses = f'{{"pod0": ["0.0.0.0:{worker_port}"]}}'

gateway_process = _create_gateway(
gateway_port, graph_description, pod_addresses, protocol
)

BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{gateway_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)

try:
if fail_endpoint_discovery:
# send request while Executor is not UP, WILL FAIL
p = multiprocessing.Process(
target=_send_request, args=(gateway_port, protocol)
)
p.start()
p.join()
assert p.exitcode != 0 # The request will fail and raise

worker_process = _create_worker(worker_port)
assert BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{worker_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)

p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert p.exitcode == 0 # The request will not fail and raise
worker_process.terminate() # kill worker
worker_process.join()
assert not worker_process.is_alive()

# send request while Executor is not UP, WILL FAIL
p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert p.exitcode != 0

worker_process = _create_worker(worker_port)

assert BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{worker_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)
p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert (
p.exitcode == 0
) # if exitcode != 0 then test in other process did not pass and this should fail
# ----------- 2. test that gateways remain alive -----------
# just do the same again, expecting the same failure
worker_process.terminate() # kill worker
worker_process.join()
assert not worker_process.is_alive()
assert (
worker_process.exitcode == 0
) # if exitcode != 0 then test in other process did not pass and this should fail

except Exception:
assert False
finally: # clean up runtimes
gateway_process.terminate()
gateway_process.join()
worker_process.terminate()
worker_process.join()


@pytest.mark.parametrize(
'fail_before_endpoint_discovery', [True, False]
) # if not before, then after
Expand Down Expand Up @@ -247,89 +330,9 @@ async def patch_process_data(self, requests_, context, **kwargs):
worker_process.join()


@pytest.mark.parametrize('protocol', ['grpc', 'http', 'grpc'])
@pytest.mark.parametrize('fail_endpoint_discovery', [True, False])
@pytest.mark.asyncio
async def test_runtimes_reconnect(port_generator, protocol, fail_endpoint_discovery):
# create gateway and workers manually, then terminate worker process to provoke an error
worker_port = port_generator()
gateway_port = port_generator()
graph_description = '{"start-gateway": ["pod0"], "pod0": ["end-gateway"]}'
pod_addresses = f'{{"pod0": ["0.0.0.0:{worker_port}"]}}'

gateway_process = _create_gateway(
gateway_port, graph_description, pod_addresses, protocol
)

BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{gateway_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)

try:
if fail_endpoint_discovery:
# send request while Executor is not UP, WILL FAIL
p = multiprocessing.Process(
target=_send_request, args=(gateway_port, protocol)
)
p.start()
p.join()
assert p.exitcode != 0 # The request will fail and raise

worker_process = _create_worker(worker_port)
assert BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{worker_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)

p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert p.exitcode == 0 # The request will not fail and raise
worker_process.terminate() # kill worker
worker_process.join()
assert not worker_process.is_alive()

# send request while Executor is not UP, WILL FAIL
p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert p.exitcode != 0

worker_process = _create_worker(worker_port)

assert BaseServer.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'0.0.0.0:{worker_port}',
ready_or_shutdown_event=multiprocessing.Event(),
)
p = multiprocessing.Process(target=_send_request, args=(gateway_port, protocol))
p.start()
p.join()
assert (
p.exitcode == 0
) # if exitcode != 0 then test in other process did not pass and this should fail
# ----------- 2. test that gateways remain alive -----------
# just do the same again, expecting the same failure
worker_process.terminate() # kill worker
worker_process.join()
assert not worker_process.is_alive()
assert (
worker_process.exitcode == 0
) # if exitcode != 0 then test in other process did not pass and this should fail

except Exception:
assert False
finally: # clean up runtimes
gateway_process.terminate()
gateway_process.join()
worker_process.terminate()
worker_process.join()


@pytest.mark.parametrize('protocol', ['grpc', 'http', 'grpc'])
@pytest.mark.parametrize('protocol', ['grpc', 'http'])
@pytest.mark.parametrize('fail_endpoint_discovery', [True, False])
@pytest.mark.asyncio
async def test_runtimes_reconnect_replicas(
Expand Down

0 comments on commit de375be

Please sign in to comment.