Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devhawk/async #127

Open
wants to merge 90 commits into
base: main
Choose a base branch
from
Open

Devhawk/async #127

wants to merge 90 commits into from

Conversation

devhawk
Copy link
Contributor

@devhawk devhawk commented Oct 2, 2024

Add support for async workflow/step/transaction methods
Also moves internal code to _core package to indicate items that should not be directly imported

fixes #112

@devhawk devhawk marked this pull request as ready for review October 16, 2024 22:47
@devhawk devhawk requested review from kraftp, qianl15 and chuck-dbos and removed request for kraftp October 16, 2024 23:10
)

@classmethod
async def get_workflow_status_async(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For async methods like this (and all the other DBOS methods with _async suffix), we have talked about moving them to a separate AsyncDBOS type. Is that still the desired approach @chuck-dbos @kraftp @qianl15

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes

tests/test_flask.py Outdated Show resolved Hide resolved
workflow_uuid,
status,
reset_recovery_attempts,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case above looks like it could have been changed to create the statement in the function, and that running the statement async or sync against the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suspect most if not all of the _some_func / some_func_sync / some_func_async pattern methods could be implemented that way. I can change these to work that way if we'd rather

self, workflow_uuid: str
) -> Optional[WorkflowStatusInternal]:
async with self.async_engine.begin() as c:
return await c.run_sync(self._get_workflow_status, workflow_uuid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of these where await session.execute could be tried.

actual_timeout = await self.sleep_async(
workflow_uuid, timeout_function_id, timeout_seconds, skip_sleep=True
)
condition.wait(timeout=actual_timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... this is not good if the code that is supposed to do the notification is in the same event loop, because it's a deadlock then. Probably it is not. But it is fiddly to reason about.

== oldest_entry_cte.c.created_at_epoch_ms,
)
.returning(SystemSchema.notifications.c.message)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this statement code be shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will fix

)

payload = f"{target_uuid}::{key}"
condition = threading.Condition()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.Condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about having different condition classes for sync/async versions. I will investigate further

)


def start_workflow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a start_workflow_async that I missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not. start_workflow is already async-ish in that it returns a WorkflowHandle. Seemed odd to have a version that returned an Awaitable[WorkflowHandle]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that is what is necessary to allow the event loop to proceed while the database call to record the workflow status is occurring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's 100% necessary

@@ -0,0 +1,52 @@
import uuid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a LOT more async tests than this. In particular, we need tests exercising every part of the async API just like for the sync API--because there's so much code duplication we need to test both sides of the code.

Depending on our API, we may also need tests for workflows that mix async and sync steps. If we don't want to support those, we have to deliberately design our API to make that impossible, for example by having a DBOS and AsyncDBOS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though I'm not sure separating Sync vs Async DBOS will make it impossible

example/main.py Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Asyncio
3 participants