Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 30, 2022
1 parent 265e5ae commit d960b19
Showing 1 changed file with 109 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from asyncio import Future
from copy import deepcopy
from pathlib import Path
from typing import Any, AsyncIterator, Callable
from typing import Any, AsyncIterator, Awaitable, Callable
from unittest import mock
from unittest.mock import call

Expand Down Expand Up @@ -45,6 +45,7 @@
from simcore_service_webserver.resource_manager.plugin import setup_resource_manager
from simcore_service_webserver.resource_manager.registry import (
RedisResourceRegistry,
RegistryKeyPrefixDict,
get_registry,
)
from simcore_service_webserver.rest import setup_rest
Expand All @@ -57,7 +58,8 @@
from simcore_service_webserver.users_api import delete_user
from simcore_service_webserver.users_exceptions import UserNotFoundError
from tenacity._asyncio import AsyncRetrying
from tenacity.stop import stop_after_attempt, stop_after_delay
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed
from yarl import URL

Expand All @@ -67,18 +69,33 @@
SERVICE_DELETION_DELAY = 1


async def open_project(client, project_uuid: str, client_session_id: str) -> None:
url = client.app.router["open_project"].url_for(project_id=project_uuid)
resp = await client.post(url, json=client_session_id)
await assert_status(resp, web.HTTPOk)


async def close_project(client, project_uuid: str, client_session_id: str) -> None:
url = client.app.router["close_project"].url_for(project_id=project_uuid)
resp = await client.post(url, json=client_session_id)
await assert_status(resp, web.HTTPNoContent)


@pytest.fixture
async def open_project() -> AsyncIterator[Callable[..., Awaitable[None]]]:
opened_projects = []

async def open_project(client, project_uuid: str, client_session_id: str) -> None:
url = client.app.router["open_project"].url_for(project_id=project_uuid)
resp = await client.post(url, json=client_session_id)
await assert_status(resp, web.HTTPOk)
opened_projects.append((client, project_uuid, client_session_id))

yield open_project
# cleanup, if we cannot close that is because the user_role might not allow it
await asyncio.gather(
*(
close_project(client, project_uuid, client_session_id)
for client, project_uuid, client_session_id in opened_projects
),
return_exceptions=True,
)


@pytest.fixture
def client(
event_loop: asyncio.AbstractEventLoop,
Expand Down Expand Up @@ -246,11 +263,10 @@ async def test_websocket_resource_management(
cur_client_session_id = client_session_id_factory()
sio = await socketio_client_factory(cur_client_session_id)
sid = sio.get_sid()
resource_key = {
"user_id": str(logged_user["id"]),
"client_session_id": cur_client_session_id,
}
# FIXME: this check fails with python-socketio>=5.0.0 (see requirements/_base.in)
resource_key = RegistryKeyPrefixDict(
user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id
)

assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [
resource_key
]
Expand All @@ -261,13 +277,17 @@ async def test_websocket_resource_management(

# NOTE: the socket.io client needs the websockets package in order to upgrade to websocket transport
await sio.disconnect()
await sio.wait()
assert not sio.get_sid()
# NOTE: let the disconnection propagate
await asyncio.sleep(1)
# now the entries should be removed
assert not await socket_registry.find_keys(("socket_id", sio.get_sid()))
assert not sid in await socket_registry.find_resources(resource_key, "socket_id")
assert not await socket_registry.find_resources(resource_key, "socket_id")

async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY):
with attempt:
# now the entries should be removed
assert not await socket_registry.find_keys(("socket_id", sio.get_sid()))
assert not sid in await socket_registry.find_resources(
resource_key, "socket_id"
)
assert not await socket_registry.find_resources(resource_key, "socket_id")


@pytest.mark.parametrize(
Expand All @@ -286,19 +306,20 @@ async def test_websocket_multiple_connections(
client_session_id_factory: Callable[[], str],
):
NUMBER_OF_SOCKETS = 5
resource_key = {}
resource_keys: list[RegistryKeyPrefixDict] = []

# connect multiple clients
clients = []
for socket_count in range(1, NUMBER_OF_SOCKETS + 1):
cur_client_session_id = client_session_id_factory()
sio = await socketio_client_factory(cur_client_session_id)
resource_key = {
"user_id": str(logged_user["id"]),
"client_session_id": cur_client_session_id,
}
assert await socket_registry.find_keys(("socket_id", sio.sid)) == [resource_key]
assert [sio.sid] == await socket_registry.find_resources(
resource_key = RegistryKeyPrefixDict(
user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id
)
assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [
resource_key
]
assert [sio.get_sid()] == await socket_registry.find_resources(
resource_key, "socket_id"
)
assert (
Expand All @@ -311,21 +332,29 @@ async def test_websocket_multiple_connections(
== socket_count
)
clients.append(sio)
resource_keys.append(resource_key)

for sio in clients:
sid = sio.sid
for sio, resource_key in zip(clients, resource_keys):
sid = sio.get_sid()
await sio.disconnect()
# need to attend the disconnect event to pass through the socketio internal queues
await asyncio.sleep(
0.1
) # must be >= 0.01 to work without issues, added some padding
await sio.wait()

assert not sio.sid
assert not await socket_registry.find_keys(("socket_id", sio.sid))
assert not await socket_registry.find_keys(("socket_id", sio.get_sid()))
assert not sid in await socket_registry.find_resources(
resource_key, "socket_id"
)

assert not await socket_registry.find_resources(resource_key, "socket_id")
for resource_key in resource_keys:
assert not await socket_registry.find_resources(resource_key, "socket_id")


_TENACITY_ASSERT_RETRY = dict(
reraise=True,
retry=retry_if_exception_type(AssertionError),
wait=wait_fixed(0.5),
stop=stop_after_delay(30),
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -376,21 +405,20 @@ async def test_websocket_disconnected_after_logout(
await assert_status(r, expected)

# the socket2 should be gone
await asyncio.sleep(1)
assert not sio2.sid
socket_logout_mock_callable2.assert_not_called()
async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY):
with attempt:
assert not sio2.sid
socket_logout_mock_callable2.assert_not_called()

# the others should receive a logout message through their respective sockets
await asyncio.sleep(3)
socket_logout_mock_callable.assert_called_once()
socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever
socket_logout_mock_callable3.assert_called_once()
# the others should receive a logout message through their respective sockets
socket_logout_mock_callable.assert_called_once()
socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever
socket_logout_mock_callable3.assert_called_once()

await asyncio.sleep(3)
# first socket should be closed now
assert not sio.sid
# second socket also closed
assert not sio3.sid
# first socket should be closed now
assert not sio.sid
# second socket also closed
assert not sio3.sid


@pytest.mark.parametrize(
Expand All @@ -401,7 +429,6 @@ async def test_websocket_disconnected_after_logout(
(UserRole.TESTER, True),
],
)
@pytest.mark.flaky(max_runs=3) # TODO: remove this flaky mark
async def test_interactive_services_removed_after_logout(
client: TestClient,
logged_user: dict[str, Any],
Expand All @@ -413,6 +440,7 @@ async def test_interactive_services_removed_after_logout(
storage_subsystem_mock: MockedStorageSubsystem, # when guest user logs out garbage is collected
director_v2_service_mock: aioresponses,
expected_save_state: bool,
open_project: Callable,
):
assert client.app

Expand Down Expand Up @@ -440,14 +468,10 @@ async def test_interactive_services_removed_after_logout(
await garbage_collector_core.collect_garbage(client.app)

# assert dynamic service is removed *this is done in a fire/forget way so give a bit of leeway
async for attempt in AsyncRetrying(
reraise=True, stop=stop_after_attempt(10), wait=wait_fixed(1)
):
async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY):
with attempt:
logger.warning(
"Waiting for stop to have been called service_uuid=%s, save_state=%s",
service["service_uuid"],
expected_save_state,
print(
f"--> Waiting for stop_dynamic_service with: {service['service_uuid']}, {expected_save_state=}",
)
mocked_director_v2_api[
"director_v2_core_dynamic_services.stop_dynamic_service"
Expand Down Expand Up @@ -477,6 +501,7 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t
storage_subsystem_mock, # when guest user logs out garbage is collected
expected_save_state: bool,
mocker: MockerFixture,
open_project: Callable,
):

# login - logged_user fixture
Expand Down Expand Up @@ -505,26 +530,29 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t
# We have no mock-up for the heatbeat...
await sio.disconnect()
assert not sio.sid
await asyncio.sleep(0.5) # let the thread call the method
socket_project_state_update_mock_callable.assert_called_with(
json.dumps(
{
"project_uuid": empty_user_project["uuid"],
"data": {
"locked": {
"value": False,
"owner": {
"user_id": logged_user["id"],
"first_name": logged_user["name"],
"last_name": "",
async for attempt in AsyncRetrying(
**(_TENACITY_ASSERT_RETRY | dict(wait=wait_fixed(0.1)))
):
with attempt:
socket_project_state_update_mock_callable.assert_called_with(
json.dumps(
{
"project_uuid": empty_user_project["uuid"],
"data": {
"locked": {
"value": False,
"owner": {
"user_id": logged_user["id"],
"first_name": logged_user["name"],
"last_name": "",
},
"status": "OPENED",
},
"state": {"value": "NOT_STARTED"},
},
"status": "OPENED",
},
"state": {"value": "NOT_STARTED"},
},
}
)
)
}
)
)
# open project in second client
await open_project(client, empty_user_project["uuid"], client_session_id2)
# ensure sufficient time is wasted here
Expand All @@ -547,13 +575,14 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t
mocked_director_v2_api["director_v2_api.stop_dynamic_service"].assert_not_called()
# now really disconnect
await sio2.disconnect()
await sio2.wait()
assert not sio2.sid
# run the garbage collector
# event after waiting some time
await asyncio.sleep(SERVICE_DELETION_DELAY + 1)
await garbage_collector_core.collect_garbage(client.app)

await asyncio.sleep(1)
await asyncio.sleep(0)
# assert dynamic service is gone
calls = [
call(
Expand Down Expand Up @@ -600,6 +629,7 @@ async def test_interactive_services_removed_per_project(
asyncpg_storage_system_mock,
storage_subsystem_mock, # when guest user logs out garbage is collected
expected_save_state: bool,
open_project: Callable,
):
# create server with delay set to DELAY
# login - logged_user fixture
Expand Down Expand Up @@ -701,6 +731,7 @@ async def test_services_remain_after_closing_one_out_of_two_tabs(
socketio_client_factory: Callable,
client_session_id_factory: Callable[[], str],
expected_save_state: bool,
open_project: Callable,
):
# create server with delay set to DELAY
# login - logged_user fixture
Expand Down Expand Up @@ -754,6 +785,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role(
storage_subsystem_mock, # when guest user logs out garbage is collected
expect_call: bool,
expected_save_state: bool,
open_project: Callable,
):
# login - logged_user fixture
# create empty study - empty_user_project fixture
Expand Down Expand Up @@ -789,7 +821,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role(
].assert_has_calls(calls)

# this call is done async, so wait a bit here to ensure it is correctly done
async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(10)):
async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY):
with attempt:
if expect_call:
# make sure `delete_project` is called
Expand Down Expand Up @@ -837,8 +869,6 @@ async def test_regression_removing_unexisting_user(
user_name={"first_name": "my name is", "last_name": "pytest"},
)
# since the call to delete is happening as fire and forget task, let's wait until it is done
async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(20)):
async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY):
with attempt:
mock_storage_delete_data_folders.assert_called()
# wait a bit here so the task is completed to prevent having unclosed loops
await asyncio.sleep(1)

0 comments on commit d960b19

Please sign in to comment.