Skip to content

Commit

Permalink
🎨 Enhances setup to remediate accumulation of messages in socketio ex…
Browse files Browse the repository at this point in the history
…change (#5341)
  • Loading branch information
pcrespov authored Feb 16, 2024
1 parent 4206404 commit 734e40e
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_PROGRESS_EVENT,
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT,
send_messages_to_group,
send_messages_to_user,
send_message_to_standard_group,
send_message_to_user,
)
from ..wallets import api as wallets_api
from ._constants import APP_RABBITMQ_CONSUMERS_KEY
Expand Down Expand Up @@ -97,67 +97,78 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = (
parse_raw_as(ProgressRabbitMessageNode | ProgressRabbitMessageProject, data)
)
socket_message: SocketMessageDict | None = None
message: SocketMessageDict | None = None
if isinstance(rabbit_message, ProgressRabbitMessageProject):
socket_message = _convert_to_project_progress_event(rabbit_message)
message = _convert_to_project_progress_event(rabbit_message)

elif rabbit_message.progress_type is ProgressType.COMPUTATION_RUNNING:
socket_message = await _convert_to_node_update_event(app, rabbit_message)
else:
socket_message = _convert_to_node_progress_event(rabbit_message)
if socket_message:
await send_messages_to_user(app, rabbit_message.user_id, [socket_message])
message = await _convert_to_node_update_event(app, rabbit_message)

else:
message = _convert_to_node_progress_event(rabbit_message)

if message:
await send_message_to_user(
app,
rabbit_message.user_id,
message=message,
ignore_queue=True,
)
return True


async def _log_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message = LoggerRabbitMessage.parse_raw(data)
socket_messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_LOG_EVENT,
"data": rabbit_message.dict(exclude={"user_id", "channel_name"}),
}
]
await send_messages_to_user(app, rabbit_message.user_id, socket_messages)
await send_message_to_user(
app,
rabbit_message.user_id,
message=SocketMessageDict(
event_type=SOCKET_IO_LOG_EVENT,
data=rabbit_message.dict(exclude={"user_id", "channel_name"}),
),
ignore_queue=True,
)
return True


async def _events_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message = EventRabbitMessage.parse_raw(data)

socket_messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_EVENT,
"data": {
await send_message_to_user(
app,
rabbit_message.user_id,
message=SocketMessageDict(
event_type=SOCKET_IO_EVENT,
data={
"action": rabbit_message.action,
"node_id": f"{rabbit_message.node_id}",
},
}
]
await send_messages_to_user(app, rabbit_message.user_id, socket_messages)
),
ignore_queue=True,
)
return True


async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> bool:
rabbit_message = parse_raw_as(WalletCreditsMessage, data)
socket_messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT,
"data": {
"wallet_id": rabbit_message.wallet_id,
"osparc_credits": rabbit_message.credits,
"created_at": rabbit_message.created_at,
},
}
]
wallet_groups = await wallets_api.list_wallet_groups_with_read_access_by_wallet(
app, wallet_id=rabbit_message.wallet_id
)
rooms_to_notify: Generator[GroupID, None, None] = (
item.gid for item in wallet_groups
)
for room in rooms_to_notify:
await send_messages_to_group(app, room, socket_messages)
await send_message_to_standard_group(
app,
room,
message=SocketMessageDict(
event_type=SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT,
data={
"wallet_id": rabbit_message.wallet_id,
"osparc_credits": rabbit_message.credits,
"created_at": rabbit_message.created_at,
},
),
)
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from models_library.users import UserID
from models_library.utils.fastapi_encoders import jsonable_encoder

from ..socketio.messages import send_messages_to_user
from ..socketio.messages import send_message_to_user


async def notify_payment_completed(
Expand All @@ -22,13 +22,15 @@ async def notify_payment_completed(
):
assert payment.completed_at is not None # nosec

messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_PAYMENT_COMPLETED_EVENT,
"data": jsonable_encoder(payment, by_alias=True),
}
]
await send_messages_to_user(app, user_id, messages)
await send_message_to_user(
app,
user_id,
message=SocketMessageDict(
event_type=SOCKET_IO_PAYMENT_COMPLETED_EVENT,
data=jsonable_encoder(payment, by_alias=True),
),
ignore_queue=True,
)


async def notify_payment_method_acked(
Expand All @@ -37,10 +39,12 @@ async def notify_payment_method_acked(
user_id: UserID,
payment_method_transaction: PaymentMethodTransaction,
):
messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
"data": jsonable_encoder(payment_method_transaction, by_alias=True),
}
]
await send_messages_to_user(app, user_id, messages)
await send_message_to_user(
app,
user_id,
message=SocketMessageDict(
event_type=SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
data=jsonable_encoder(payment_method_transaction, by_alias=True),
),
ignore_queue=True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@
from ..socketio.messages import (
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_UPDATED_EVENT,
send_messages_to_group,
send_messages_to_user,
send_message_to_standard_group,
send_message_to_user,
)
from ..storage import api as storage_api
from ..users.api import FullNameDict, get_user_fullname, get_user_role
Expand Down Expand Up @@ -1441,30 +1441,31 @@ async def remove_project_dynamic_services(
async def notify_project_state_update(
app: web.Application,
project: ProjectDict,
notify_only_user: int | None = None,
notify_only_user: UserID | None = None,
) -> None:
if await is_project_hidden(app, ProjectID(project["uuid"])):
return
messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_PROJECT_UPDATED_EVENT,
"data": {
"project_uuid": project["uuid"],
"data": project["state"],
},
}
]
message = SocketMessageDict(
event_type=SOCKET_IO_PROJECT_UPDATED_EVENT,
data={
"project_uuid": project["uuid"],
"data": project["state"],
},
)

if notify_only_user:
await send_messages_to_user(
app, user_id=f"{notify_only_user}", messages=messages
await send_message_to_user(
app,
user_id=notify_only_user,
message=message,
ignore_queue=True,
)
else:
rooms_to_notify: Generator[GroupID, None, None] = (
gid for gid, rights in project["accessRights"].items() if rights["read"]
)
for room in rooms_to_notify:
await send_messages_to_group(app, room, messages)
await send_message_to_standard_group(app, group_id=room, message=message)


async def notify_project_node_update(
Expand All @@ -1480,22 +1481,20 @@ async def notify_project_node_update(
gid for gid, rights in project["accessRights"].items() if rights["read"]
]

messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_NODE_UPDATED_EVENT,
"data": {
"project_id": project["uuid"],
"node_id": f"{node_id}",
# as GET projects/{project_id}/nodes/{node_id}
"data": project["workbench"][f"{node_id}"],
# as GET projects/{project_id}/nodes/{node_id}/errors
"errors": errors,
},
}
]
message = SocketMessageDict(
event_type=SOCKET_IO_NODE_UPDATED_EVENT,
data={
"project_id": project["uuid"],
"node_id": f"{node_id}",
# as GET projects/{project_id}/nodes/{node_id}
"data": project["workbench"][f"{node_id}"],
# as GET projects/{project_id}/nodes/{node_id}/errors
"errors": errors,
},
)

for room in rooms_to_notify:
await send_messages_to_group(app, room, messages)
await send_message_to_standard_group(app, room, message)


async def retrieve_and_notify_project_locked_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..products.api import Product, get_current_product
from ..resource_manager.user_sessions import managed_resource
from ._utils import EnvironDict, SocketID, get_socket_server, register_socketio_handler
from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_messages_to_user
from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_message_to_user

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,16 +138,14 @@ async def connect(
product_name,
)

heart_beat_messages: list[SocketMessageDict] = [
{
"event_type": SOCKET_IO_HEARTBEAT_EVENT,
"data": {"interval": _EMIT_INTERVAL_S},
}
]
await send_messages_to_user(
await send_message_to_user(
app,
user_id,
heart_beat_messages,
message=SocketMessageDict(
event_type=SOCKET_IO_HEARTBEAT_EVENT,
data={"interval": _EMIT_INTERVAL_S},
),
ignore_queue=True,
)

except web.HTTPUnauthorized as exc:
Expand Down
Loading

0 comments on commit 734e40e

Please sign in to comment.