Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events: Reusable events #310

Merged
merged 6 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 11 additions & 28 deletions api/addons/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ayon_server.installer.addons import get_addon_zip_info
from ayon_server.lib.postgres import Postgres
from ayon_server.types import Field, OPModel
from ayon_server.utils import hash_data

from .router import router

Expand Down Expand Up @@ -68,34 +69,16 @@ async def upload_addon_zip_file(
# and contains an addon. If it doesn't, an exception is raised before
# we reach this point.

# Let's check if we installed this addon before

query = """
SELECT id FROM events
WHERE topic = 'addon.install'
AND summary->>'name' = $1
AND summary->>'version' = $2
LIMIT 1
"""

res = await Postgres.fetch(query, zip_info.name, zip_info.version)
if res:
event_id = res[0]["id"]
await EventStream.update(
event_id,
description="Reinstalling addon from zip file",
summary=zip_info.dict(exclude_none=True),
status="pending",
)
else:
# If not, dispatch a new event
event_id = await EventStream.dispatch(
"addon.install",
description=f"Installing addon {zip_info.name} {zip_info.version}",
summary=zip_info.dict(exclude_none=True),
user=user.name,
finished=False,
)
event_hash = hash_data(["addon.install", zip_info.name, zip_info.version])
event_id = await EventStream.dispatch(
"addon.install",
hash=event_hash,
description=f"Installing addon {zip_info.name} {zip_info.version}",
summary=zip_info.dict(exclude_none=True),
user=user.name,
finished=False,
reuse=True,
)

# Start the installation in the background
# And return the event ID to the client,
Expand Down
10 changes: 8 additions & 2 deletions api/events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ class DispatchEventRequestModel(OPModel):
description="Set to False for fire-and-forget events",
example=True,
)
reuse: bool = Field(
False,
title="Reuse",
description="Allow reusing events with the same hash",
example=False,
)


class UpdateEventRequestModel(OPModel):
Expand Down Expand Up @@ -112,6 +118,7 @@ async def post_event(
payload=request.payload,
finished=request.finished,
store=request.store,
reuse=request.reuse,
)
return DispatchEventResponseModel(id=event_id)

Expand Down Expand Up @@ -193,6 +200,5 @@ async def delete_event(user: CurrentUser, event_id: EventID) -> EmptyResponse:
if not user.is_admin:
raise ForbiddenException("Not allowed to delete events")

await Postgres.execute("DELETE FROM events WHERE id = $1", event_id)

await EventStream.delete(event_id)
return EmptyResponse()
17 changes: 7 additions & 10 deletions ayon_server/api/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from nxtools import logging

from ayon_server.events import EventStream
from ayon_server.exceptions import ConstraintViolationException
from ayon_server.lib.postgres import Postgres


Expand Down Expand Up @@ -40,15 +39,13 @@ async def require_server_restart(
if reason is None:
reason = "Server restart is required"

try:
await EventStream.dispatch(
topic, hash=topic, description=reason, user=user_name
)
except ConstraintViolationException:
# we don't need to do anything here. If the event fails,
# it means the event was already triggered, and the server
# is pending restart.
pass
await EventStream.dispatch(
topic,
hash=topic,
description=reason,
user=user_name,
reuse=True,
)


async def clear_server_restart_required():
Expand Down
17 changes: 17 additions & 0 deletions ayon_server/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ async def dispatch_event(
payload: dict[str, Any] | None = None,
finished: bool = True,
store: bool = True,
reuse: bool = False,
) -> str:
"""Dispatch an event to the event stream.

This function is deprecated and is provided only to
maintain backwards compatibility with older addons.

Use `EventStream.dispatch` instead.
"""
return await EventStream.dispatch(
topic=topic,
sender=sender,
Expand All @@ -40,6 +48,7 @@ async def dispatch_event(
payload=payload,
finished=finished,
store=store,
reuse=reuse,
)


Expand All @@ -58,6 +67,14 @@ async def update_event(
store: bool = True,
retries: int | None = None,
) -> bool:
"""Update an event in the event stream.

This function is deprecated and is provided only to
maintain backwards compatibility with older addons.

Use `EventStream.update` instead.

"""
return await EventStream.update(
event_id=event_id,
sender=sender,
Expand Down
78 changes: 62 additions & 16 deletions ayon_server/events/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def dispatch(
payload: dict[str, Any] | None = None,
finished: bool = True,
store: bool = True,
reuse: bool = False,
recipients: list[str] | None = None,
) -> str:
"""
Expand All @@ -61,6 +62,9 @@ async def dispatch(
store:
whether to store the event in the database

reuse:
allow to reuse an existing event with the same hash

recipients:
list of user names to notify via websocket (None for all users)
"""
Expand Down Expand Up @@ -95,30 +99,67 @@ async def dispatch(
)

if store:
query = SQLTool.insert(
table="events",
id=event.id,
hash=event.hash,
sender=event.sender,
sender_type=event.sender_type,
topic=event.topic,
project_name=event.project,
user_name=event.user,
depends_on=depends_on,
status=status,
description=description,
summary=event.summary,
payload=event.payload,
)
query = """
INSERT INTO
events (
id,
hash,
sender,
sender_type,
topic,
project_name,
user_name,
depends_on,
status,
description,
summary,
payload
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
"""
if reuse:
query += """
ON CONFLICT (hash) DO UPDATE SET
id = EXCLUDED.id,
sender = EXCLUDED.sender,
sender_type = EXCLUDED.sender_type,
topic = EXCLUDED.topic,
project_name = EXCLUDED.project_name,
user_name = EXCLUDED.user_name,
depends_on = EXCLUDED.depends_on,
status = EXCLUDED.status,
description = EXCLUDED.description,
summary = EXCLUDED.summary,
payload = EXCLUDED.payload,
updated_at = NOW()
"""

try:
await Postgres.execute(*query)
await Postgres.execute(
query,
event.id,
event.hash,
event.sender,
event.sender_type,
event.topic,
event.project,
event.user,
event.depends_on,
status,
description,
event.summary,
event.payload,
)
except Postgres.ForeignKeyViolationError as e:
raise ConstraintViolationException(
"Event depends on non-existing event",
) from e

except Postgres.UniqueViolationError as e:
if reuse:
raise ConstraintViolationException(
"Unable to reuse the event. Another event depends on it",
) from e
raise ConstraintViolationException(
"Event with same hash already exists",
) from e
Expand Down Expand Up @@ -274,3 +315,8 @@ async def get(cls, event_id: str) -> EventModel:
if event is None:
raise NotFoundException("Event not found")
return event

@classmethod
async def delete(cls, event_id: str) -> None:
await Postgres.execute("DELETE FROM events WHERE id = $1", event_id)
return None