Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Apr 24, 2023
1 parent 33f0ce0 commit 5c32b7a
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from .login.plugin import setup_login
from .long_running_tasks import setup_long_running_tasks
from .meta_modeling.plugin import setup_meta_modeling
from .notifications.rabbitmq import setup_rabbitmq
from .products import setup_products
from .projects.plugin import setup_projects
from .publications import setup_publications
from .rabbitmq.rabbitmq import setup_rabbitmq
from .redis import setup_redis
from .remote_debug import setup_remote_debugging
from .resource_manager.plugin import setup_resource_manager
Expand Down
6 changes: 0 additions & 6 deletions services/web/server/src/simcore_service_webserver/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
)
from tenacity import retry

from .application_settings import get_settings
from .computation_comp_tasks_listening_task import create_comp_tasks_listening_task
from .db_settings import PostgresSettings, get_plugin_settings

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -102,7 +100,3 @@ def setup_db(app: web.Application) -> None:

# async connection to db
app.cleanup_ctx.append(postgres_cleanup_ctx)

app_settings = get_settings(app)
if app_settings.WEBSERVER_COMPUTATION:
app.cleanup_ctx.append(create_comp_tasks_listening_task)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
ServiceWaitingForManualIntervention,
)
from .director_v2_settings import DirectorV2Settings, get_plugin_settings
from .rabbitmq.rabbitmq import get_rabbitmq_client
from .notifications.rabbitmq import get_rabbitmq_client

log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects
from sqlalchemy.sql import select

from ..projects import projects_api, projects_exceptions
from ..projects.projects_nodes_utils import update_node_outputs
from .computation_utils import convert_state_from_db
from .projects import projects_api, projects_exceptions
from .projects.projects_nodes_utils import update_node_outputs

log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
ProgressType,
)
from pydantic import parse_raw_as
from servicelib.aiohttp.application_keys import APP_RABBITMQ_CLIENT_KEY
from servicelib.aiohttp.monitor_services import (
SERVICE_STARTED_LABELS,
SERVICE_STOPPED_LABELS,
Expand All @@ -25,6 +24,7 @@

from ..projects import projects_api
from ..projects.projects_exceptions import NodeNotFoundError, ProjectNotFoundError
from ..rabbitmq import get_rabbitmq_client
from ..socketio.events import (
SOCKET_IO_EVENT,
SOCKET_IO_LOG_EVENT,
Expand Down Expand Up @@ -179,7 +179,7 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool:

async def setup_rabbitmq_consumers(app: web.Application) -> AsyncIterator[None]:
with log_context(logger, logging.INFO, msg="Subscribing to rabbitmq channels"):
rabbit_client: RabbitMQClient = app[APP_RABBITMQ_CLIENT_KEY]
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)

for exchange_name, parser_fct, queue_kwargs in EXCHANGE_TO_PARSER_CONFIG:
await rabbit_client.subscribe(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
computation module is the main entry-point for computational backend
"""
import logging

from aiohttp import web
from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup

from ..db import setup_db
from ..rabbitmq import setup_rabbitmq
from ._computation_comp_tasks_listening_task import create_comp_tasks_listening_task
from ._rabbitmq_consumers import setup_rabbitmq_consumers

log = logging.getLogger(__name__)


@app_module_setup(
__name__,
ModuleCategory.ADDON,
settings_name="WEBSERVER_COMPUTATION",
logger=log,
depends=[
"simcore_service_webserver.diagnostics",
], # depends on diagnostics for setting the instrumentation
)
def setup_computation(app: web.Application):
setup_rabbitmq(app)
# Subscribe to rabbit upon startup for logs, progress and other
# metrics on the execution reported by sidecars
app.cleanup_ctx.append(setup_rabbitmq_consumers)

# Creates a task to listen to comp_task pg-db's table events
setup_db(app)
app.cleanup_ctx.append(create_comp_tasks_listening_task)
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive

from ..application_settings import get_settings
from .consumers import setup_rabbitmq_consumers
from .rabbitmq_settings import RabbitSettings, get_plugin_settings

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,9 +41,6 @@ async def _rabbitmq_client_cleanup_ctx(app: web.Application) -> AsyncIterator[No
)
def setup_rabbitmq(app: web.Application) -> None:
app.cleanup_ctx.append(_rabbitmq_client_cleanup_ctx)
app_settings = get_settings(app)
if app_settings.WEBSERVER_COMPUTATION:
app.cleanup_ctx.append(setup_rabbitmq_consumers)


def get_rabbitmq_client(app: web.Application) -> RabbitMQClient:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from pydantic import BaseModel
from pytest_mock import MockerFixture
from simcore_service_webserver.rabbitmq import consumers
from simcore_service_webserver.notifications import _rabbitmq_consumers

_faker = Faker()

Expand All @@ -25,7 +25,9 @@ def mock_send_messages(mocker: MockerFixture) -> Iterator[dict]:
async def mock_send_message(*args) -> None:
reference["args"] = args

mocker.patch.object(consumers, "send_messages", side_effect=mock_send_message)
mocker.patch.object(
_rabbitmq_consumers, "send_messages", side_effect=mock_send_message
)

yield reference

Expand Down Expand Up @@ -63,7 +65,7 @@ async def mock_send_message(*args) -> None:
async def test_regression_progress_message_parser(
mock_send_messages: dict, raw_data: bytes, class_type: type[BaseModel]
):
await consumers._progress_message_parser(AsyncMock(), raw_data)
await _rabbitmq_consumers._progress_message_parser(AsyncMock(), raw_data)
serialized_sent_data = mock_send_messages["args"][2][0]["data"]
# check that all fields are sent as expected
assert class_type.parse_obj(serialized_sent_data)

0 comments on commit 5c32b7a

Please sign in to comment.