Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][WIP] Async SQLAlchemy sessions in Airflow #36504

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hussein-awala
Copy link
Member

Airflow Metadata is the brain of Airflow; it is the central component used to store the state of all operations and enable communication between other components.

A lot of the execution time of Airflow components is wasted waiting for some input of this Metadata, which we can avoid if we query the database asynchronously. We can perform other operations during the waiting time or send more requests to Metadata.

SQLAlchemy supports since 1.4 Asynchronous I/O (asyncio) that we can use to improve all our components:

  • improving scheduler logic
  • improving the async execution in the executors
  • making the Triggerer fully asynchronous
  • loading variables/connections asynchronously in the triggers and the async hooks.
  • and, of course, improving the performance of our API and webserver, which will serve much more users with lower resource usage.

To test and validate the efficiency of async sessions before merging complicated methods from sync to async, I implemented the utils to create the async sessions and some async methods to load the variables and the connections from the database, and I tried one of the newly implemented methods with the triggerer:

test_trigger.py:

from typing import Any

from airflow.triggers.base import BaseTrigger


class TestTrigger(BaseTrigger):

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return "airflow.test_trigger.TestTrigger", {}

    async def run(self):
        import asyncio

        from airflow.models.connection import Connection

        for _ in range(1000):
            conn = await Connection.async_get_connection_from_secrets("airflow_db")
            self.log.info(conn.login)
            await asyncio.sleep(0.1)
        yield {}

dag.py:

from datetime import datetime
from airflow.models.dag import DAG
from airflow.models.operator import BaseOperator
from airflow.test_trigger import TestTrigger


class TestOperator(BaseOperator):
    def __init__(self, param: int, **kwargs):
        super().__init__(**kwargs)
        self.param = param

    def execute(self, context):
        self.defer(
            trigger=TestTrigger(),
            method_name="execute_complete",
        )

    def execute_complete(self):
        self.log.info("Done!")


with DAG(
    dag_id="async_db",
    schedule_interval=None,
    start_date=datetime(2023, 12, 31),
    catchup=False,
) as dag:
    TestOperator.partial(task_id="test_task").expand(param=list(range(1000)))

I triggered a manual Dag Run, which created 1000 mapped tasks and deferred them.

I had the expected result in the log (login root) without any blocking in the event loop of the Triggerer, with reasonable resource consumption.

Then I tested by replacing the trigger run method with:

    async def run(self):
        import asyncio

        from airflow.models.connection import Connection

        for _ in range(1000):
            conn = Connection.get_connection_from_secrets("airflow_db")
            self.log.info(conn.login)
            await asyncio.sleep(0.1)
        yield {}

I got the result in the task log, but the Triggerer log was full of:

[2023-12-31T01:42:36.934+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/235/1 (ID 471) starting
[2023-12-31T01:42:37.091+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/236/1 (ID 472) starting
[2023-12-31T01:42:37.159+0000] {triggerer_job_runner.py:576} INFO - Triggerer's async thread was blocked for 0.57 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
[2023-12-31T01:42:37.369+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/237/1 (ID 473) starting
[2023-12-31T01:42:37.549+0000] {triggerer_job_runner.py:598} INFO - trigger async_db/manual__2023-12-31T01:42:07.645143+00:00/test_task/239/1 (ID 474) starting
[2023-12-31T01:42:37.772+0000] {triggerer_job_runner.py:576} INFO - Triggerer's async thread was blocked for 0.61 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.

and the Triggerer used almost all the resources of my computer.

@potiuk
Copy link
Member

potiuk commented Dec 31, 2023

I expect a long time to go/test all cases . But I love it :).

But yeah, if we limit it for Connections and Variables for now, that would be cool. I think one general comment here is that not everyone will be able to use those drivers that support async operations I am afraid. There might be various limitations - dependencies, some specific corporate/managed DB configuration that is not supported - so, almost-for-sure - we should have a way to

a) handle sync access
b) fail async access (from triggers) when async access is done and the driver does not support it (?)
c) possibly provide a way for Trigger developers to fallback to sync access (following #36492)

I think that cleanest solution we should explicitly recognise sync/async access to Connections/Variables and enable them by a configuration flag for example and have a method/way to check it from Trigger.

@hussein-awala
Copy link
Member Author

I expect a long time to go/test all cases . But I love it :).

But yeah, if we limit it for Connections and Variables for now, that would be cool. I think one general comment here is that not everyone will be able to use those drivers that support async operations I am afraid. There might be various limitations - dependencies, some specific corporate/managed DB configuration that is not supported - so, almost-for-sure - we should have a way to

a) handle sync access
b) fail async access (from triggers) when async access is done and the driver does not support it (?)
c) possibly provide a way for Trigger developers to fallback to sync access (following #36492)

I think that cleanest solution we should explicitly recognise sync/async access to Connections/Variables and enable them by a configuration flag for example and have a method/way to check it from Trigger.

I have some ideas for testing the async drivers of each database in our CI in a short execution time (I will implement them soon).

For this point:

possibly provide a way for Trigger developers to fallback to sync access

we have different options, like fallback to sync get when the create_async_session raises certain exceptions; in this case we can announce that this feature is only supported with some databases (for the rest, it will fallback to sync) and introduce it as an experimental feature to simplify future changes in the design.

@hussein-awala hussein-awala added the full tests needed We need to run full set of tests for this PR to merge label Dec 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:dev-tools area:providers area:secrets full tests needed We need to run full set of tests for this PR to merge pinned Protect from Stalebot auto closing provider:postgres
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants