Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devhawk/async #127

Open
wants to merge 90 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
e844f4d
sqlalchemy[asyncio]
devhawk Sep 20, 2024
b23611e
WIP
devhawk Sep 21, 2024
a145e7f
async engine dispose
devhawk Sep 21, 2024
84f6ddf
WIP
devhawk Sep 21, 2024
ac378ad
wip
devhawk Sep 21, 2024
99972e2
async get_workflows
devhawk Sep 21, 2024
968d75d
update_workflow_inputs
devhawk Sep 21, 2024
e79c211
update_workflow_status
devhawk Sep 21, 2024
17e1554
wip
devhawk Sep 21, 2024
e9e9b79
get_workflow_status_within_wf
devhawk Sep 21, 2024
f3462bb
await_workflow_result
devhawk Sep 21, 2024
4c6e73f
remove sysdb engine
devhawk Sep 21, 2024
57c7f17
use a runner
devhawk Sep 21, 2024
512f88b
remove asyncio.runner (not available before v3.11)
devhawk Sep 23, 2024
b6d723f
Merge remote-tracking branch 'origin/main' into devhawk/async-sysdb
devhawk Sep 24, 2024
fbfb25e
core recv/_set_event/_get_event/sleep (send requires WF infra first)
devhawk Sep 24, 2024
a354061
async _init_workflow
devhawk Sep 24, 2024
9ec1ce9
inital async WF
devhawk Sep 25, 2024
1be23c7
async step
devhawk Sep 25, 2024
6982fc3
async app db methods
devhawk Sep 25, 2024
df437a6
async tx support
devhawk Sep 25, 2024
463afa2
_register_send_wf
devhawk Sep 25, 2024
e6faed5
async _send
devhawk Sep 25, 2024
0fc762b
Merge remote-tracking branch 'origin/main' into devhawk/async-sysdb
devhawk Sep 25, 2024
f3bd6b5
Merge branch 'main' into devhawk/async-sysdb
devhawk Sep 25, 2024
ee65170
Merge branch 'main' into devhawk/async-sysdb
devhawk Sep 26, 2024
d4cd1e7
Merge branch 'main' into devhawk/async-sysdb
devhawk Sep 27, 2024
1b8bd3e
add async launch/destroy methods
devhawk Sep 27, 2024
c140e0d
add example app for test purposes (to be deleted later)
devhawk Sep 27, 2024
45e9945
Merge branch 'main' into devhawk/async
devhawk Sep 27, 2024
db795ef
WIP
devhawk Sep 28, 2024
a7e2f7c
Merge remote-tracking branch 'origin/main' into devhawk/async
devhawk Sep 30, 2024
4aa9b86
WIP
devhawk Sep 30, 2024
c244384
Revert "WIP"
devhawk Sep 30, 2024
a69486c
fix comments
devhawk Sep 30, 2024
701ddb8
DRY
devhawk Sep 30, 2024
476e80f
DRY more
devhawk Sep 30, 2024
f7e1ff1
still more DRY
devhawk Sep 30, 2024
2e86312
fix mypy
devhawk Oct 1, 2024
fef4200
dry _start_workflow_async
devhawk Oct 1, 2024
a8752f1
there should not be an async version of _start_workflow
devhawk Oct 2, 2024
3d2dada
fix _execute_workflow_async to support sync/async wf funcs
devhawk Oct 2, 2024
01a4e37
fix submit_args
devhawk Oct 2, 2024
750f4ed
WorkflowHandle async methods
devhawk Oct 2, 2024
14ced5c
run_coroutine
devhawk Oct 4, 2024
419feb5
mypy fixes
devhawk Oct 4, 2024
eb98ab2
Merge branch 'main' into devhawk/async
devhawk Oct 7, 2024
770cf22
Merge branch 'main' into devhawk/async
devhawk Oct 9, 2024
60c3aee
remove run_coroutine
devhawk Oct 11, 2024
8cf4ce2
move core into _core and break into separate files
devhawk Oct 11, 2024
8834935
fix incorrect underscore
devhawk Oct 12, 2024
b1d5592
move sys db into _core
devhawk Oct 12, 2024
29a590c
move utils into _core and rename as serialization
devhawk Oct 12, 2024
276b8db
Merge remote-tracking branch 'origin/main' into devhawk/async
devhawk Oct 14, 2024
2dd148e
remove asyncio.run app db
devhawk Oct 14, 2024
ffc15c2
update_workflow_status_sync
devhawk Oct 14, 2024
ea3598c
get_workflow_status_sync
devhawk Oct 14, 2024
44b584b
more sync sysdb
devhawk Oct 14, 2024
1a9e47d
more sys db
devhawk Oct 14, 2024
811c7f6
more sysdb
devhawk Oct 14, 2024
77069e3
WIP
devhawk Oct 15, 2024
941520a
WIP
devhawk Oct 15, 2024
474ea8e
WIP
devhawk Oct 15, 2024
04b76c8
WIP
devhawk Oct 15, 2024
45e8830
queue sync
devhawk Oct 15, 2024
55fcc3a
sync/async wf status buffer flush
devhawk Oct 15, 2024
6a20272
dbos sleep sync
devhawk Oct 15, 2024
2aff09c
more sync
devhawk Oct 15, 2024
656d1ab
send sync
devhawk Oct 15, 2024
503e8e1
Merge branch 'main' into devhawk/async
devhawk Oct 15, 2024
1da5a93
Merge remote-tracking branch 'origin/main' into devhawk/async
devhawk Oct 16, 2024
75c9bf7
move appdb module to _core
devhawk Oct 16, 2024
8f58d7c
move schemas and migrations into _core
devhawk Oct 16, 2024
c9009ea
minor cleanup
devhawk Oct 16, 2024
99f4b2d
move recovery to _core
devhawk Oct 16, 2024
91d7d7c
move admin server to _core
devhawk Oct 16, 2024
dac1e8e
fix test_schema_migration
devhawk Oct 16, 2024
a352d75
tracer -> _core
devhawk Oct 16, 2024
d386db6
move roles, request, registrations modules to _core
devhawk Oct 16, 2024
2afe309
rename decorators module to classproperty
devhawk Oct 16, 2024
c5db922
fastapi, flask, kafka and logger -> _core
devhawk Oct 16, 2024
2e8c52b
scheduler & croniter -> _core
devhawk Oct 16, 2024
aa01783
queue_thread -> _core
devhawk Oct 16, 2024
6c94f70
relative imports
devhawk Oct 16, 2024
ecbfd96
cleanup tests
devhawk Oct 17, 2024
1b4557c
update appdb to consistent naming of sync/async engine and methods w/…
devhawk Oct 17, 2024
dc64777
sync destroy support
devhawk Oct 18, 2024
252dbaf
remove flush_workflow_buffers_async
devhawk Oct 18, 2024
038a89a
remove example
devhawk Oct 18, 2024
b883ee9
Merge remote-tracking branch 'origin/main' into devhawk/async
devhawk Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .dbos_config import ConfigFile, get_dbos_database_url, load_config
from .kafka_message import KafkaMessage
from .queue import Queue
from .system_database import GetWorkflowsInput, WorkflowStatusString
from .types import GetWorkflowsInput, WorkflowStatusString

__all__ = [
"ConfigFile",
Expand Down
7 changes: 3 additions & 4 deletions dbos/admin_sever.py → dbos/_core/admin_sever.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

import psutil

from dbos.recovery import _recover_pending_workflows

from .logger import dbos_logger
from .recovery import recover_pending_workflows

if TYPE_CHECKING:
from .dbos import DBOS
from ..dbos import DBOS

health_check_path = "/dbos-healthz"
workflow_recovery_path = "/dbos-workflow-recovery"
Expand Down Expand Up @@ -79,7 +78,7 @@ def do_POST(self) -> None:
if self.path == workflow_recovery_path:
executor_ids: List[str] = json.loads(post_data.decode("utf-8"))
dbos_logger.info("Recovering workflows for executors: %s", executor_ids)
workflow_handles = _recover_pending_workflows(self.dbos, executor_ids)
workflow_handles = recover_pending_workflows(self.dbos, executor_ids)
workflow_ids = [handle.workflow_id for handle in workflow_handles]
self.send_response(200)
self._end_headers()
Expand Down
113 changes: 70 additions & 43 deletions dbos/application_database.py → dbos/_core/application_database.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
from typing import Optional, TypedDict, cast
import asyncio
from typing import Optional

import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as pg
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session, sessionmaker

from dbos.error import DBOSWorkflowConflictIDError
from dbos.schemas.application_database import ApplicationSchema

from .dbos_config import ConfigFile


class TransactionResultInternal(TypedDict):
workflow_uuid: str
function_id: int
output: Optional[str] # JSON (jsonpickle)
error: Optional[str] # JSON (jsonpickle)
txn_id: Optional[str]
txn_snapshot: str
executor_id: Optional[str]


class RecordedResult(TypedDict):
output: Optional[str] # JSON (jsonpickle)
error: Optional[str] # JSON (jsonpickle)
from ..dbos_config import ConfigFile
from ..error import DBOSWorkflowConflictIDError
from .schemas.application_database import ApplicationSchema
from .types import RecordedResult, TransactionResultInternal


class ApplicationDatabase:
Expand Down Expand Up @@ -61,24 +48,36 @@ def __init__(self, config: ConfigFile):
port=config["database"]["port"],
database=app_db_name,
)
self.engine = sa.create_engine(
self.sync_engine = sa.create_engine(
app_db_url, pool_size=20, max_overflow=5, pool_timeout=30
)
self.sessionmaker = sessionmaker(bind=self.sync_engine)
self.async_engine = create_async_engine(
app_db_url, pool_size=20, max_overflow=5, pool_timeout=30
)
self.sessionmaker = sessionmaker(bind=self.engine)
self.async_sessionmaker = async_sessionmaker(self.async_engine)

# Create the dbos schema and transaction_outputs table in the application database
with self.engine.begin() as conn:
with self.sync_engine.begin() as conn:
schema_creation_query = sa.text(
f"CREATE SCHEMA IF NOT EXISTS {ApplicationSchema.schema}"
)
conn.execute(schema_creation_query)
ApplicationSchema.metadata_obj.create_all(self.engine)
ApplicationSchema.metadata_obj.create_all(self.sync_engine)

def destroy(self) -> None:
self.engine.dispose()
self.sync_engine.dispose()
# As per the SQLAlchemy docs, the AsyncEngine.sync_engine field is public so it can be used as an event target
# However, under the hood, AsyncEngine calls sync_engine.dispose in a greenlit, so it is likely OK to
# call it directly for sync disposal
self.async_engine.sync_engine.dispose()

async def destroy_async(self) -> None:
self.sync_engine.dispose()
await self.async_engine.dispose()

@staticmethod
def record_transaction_output(
def record_transaction_output_sync(
session: Session, output: TransactionResultInternal
) -> None:
try:
Expand All @@ -100,31 +99,49 @@ def record_transaction_output(
raise DBOSWorkflowConflictIDError(output["workflow_uuid"])
raise

def record_transaction_error(self, output: TransactionResultInternal) -> None:
@staticmethod
async def record_transaction_output_async(
session: AsyncSession, output: TransactionResultInternal
) -> None:
await session.run_sync(
ApplicationDatabase.record_transaction_output_sync, output
)

@staticmethod
def _record_transaction_error(
conn: sa.Connection, output: TransactionResultInternal
) -> None:
try:
with self.engine.begin() as conn:
conn.execute(
pg.insert(ApplicationSchema.transaction_outputs).values(
workflow_uuid=output["workflow_uuid"],
function_id=output["function_id"],
output=None,
error=output["error"],
txn_id=sa.text(
"(select pg_current_xact_id_if_assigned()::text)"
),
txn_snapshot=output["txn_snapshot"],
executor_id=(
output["executor_id"] if output["executor_id"] else None
),
)
conn.execute(
pg.insert(ApplicationSchema.transaction_outputs).values(
workflow_uuid=output["workflow_uuid"],
function_id=output["function_id"],
output=None,
error=output["error"],
txn_id=sa.text("(select pg_current_xact_id_if_assigned()::text)"),
txn_snapshot=output["txn_snapshot"],
executor_id=(
output["executor_id"] if output["executor_id"] else None
),
)
)
except DBAPIError as dbapi_error:
if dbapi_error.orig.sqlstate == "23505": # type: ignore
raise DBOSWorkflowConflictIDError(output["workflow_uuid"])
raise

def record_transaction_error_sync(self, output: TransactionResultInternal) -> None:
with self.sync_engine.begin() as conn:
ApplicationDatabase._record_transaction_error(conn, output)

async def record_transaction_error_async(
self, output: TransactionResultInternal
) -> None:
async with self.async_engine.begin() as conn:
await conn.run_sync(ApplicationDatabase._record_transaction_error, output)

@staticmethod
def check_transaction_execution(
def check_transaction_execution_sync(
session: Session, workflow_uuid: str, function_id: int
) -> Optional[RecordedResult]:
rows = session.execute(
Expand All @@ -143,3 +160,13 @@ def check_transaction_execution(
"error": rows[0][1],
}
return result

@staticmethod
async def check_transaction_execution_async(
session: AsyncSession, workflow_uuid: str, function_id: int
) -> Optional[RecordedResult]:
return await session.run_sync(
ApplicationDatabase.check_transaction_execution_sync,
workflow_uuid,
function_id,
)
Loading
Loading