Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic asyncio support #43944

Merged
merged 13 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,19 @@ jobs:
working-directory: ./clients/python
- name: "Install source version of required packages"
run: |
breeze release-management prepare-provider-packages fab standard common.sql --package-format \
wheel --skip-tag-check --version-suffix-for-pypi dev0
pip install . dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl
breeze release-management prepare-provider-packages \
fab \
standard \
common.sql \
sqlite \
--package-format wheel \
--skip-tag-check \
--version-suffix-for-pypi dev0
pip install . \
dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl \
dist/apache_airflow_providers_common_sql-*.whl \
dist/apache_airflow_providers_sqlite-*.whl
breeze release-management prepare-task-sdk-package --package-format wheel
pip install ./dist/apache_airflow_task_sdk-*.whl
- name: "Install Python client"
Expand Down
34 changes: 33 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pluggy
from packaging.version import Version
from sqlalchemy import create_engine, exc, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

Expand Down Expand Up @@ -95,8 +96,17 @@
DONOT_MODIFY_HANDLERS: bool | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""
Mapping of sync scheme to async scheme.

:meta private:
"""

engine: Engine
Session: Callable[..., SASession]
async_engine: AsyncEngine
create_async_session: Callable[..., AsyncSession]

# The JSON library to use for DAG Serialization and De-Serialization
json = json
Expand Down Expand Up @@ -199,13 +209,25 @@ def load_policy_plugins(pm: pluggy.PluginManager):
pm.load_setuptools_entrypoints("airflow.policy")


def _get_async_conn_uri_from_sync(sync_uri):
scheme, rest = sync_uri.split(":", maxsplit=1)
dstandish marked this conversation as resolved.
Show resolved Hide resolved
scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
if aiolib:
return f"{scheme}+{aiolib}:{rest}"
else:
return sync_uri


def configure_vars():
"""Configure Global Variables from airflow.cfg."""
global SQL_ALCHEMY_CONN
global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER
global DONOT_MODIFY_HANDLERS
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)

DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))

Expand Down Expand Up @@ -441,6 +463,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None):

global Session
global engine
global async_engine
global create_async_session
dstandish marked this conversation as resolved.
Show resolved Hide resolved

if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
Expand All @@ -466,7 +491,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
connect_args["check_same_thread"] = False

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)

async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True)
create_async_session = sessionmaker(
bind=async_engine,
autocommit=False,
autoflush=False,
class_=AsyncSession,
expire_on_commit=False,
)
mask_secret(engine.url.password)

setup_event_handlers(engine)
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0b0",
"apache-airflow>=2.8.0b0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4",
""",
id="beta0 suffix postgres",
Expand All @@ -221,6 +222,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
dstandish marked this conversation as resolved.
Show resolved Hide resolved
"psycopg2-binary>=2.9.4",
""",
id="No suffix postgres",
Expand Down
3 changes: 3 additions & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@
},
"mysql": {
"deps": [
"aiomysql>=0.2.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"mysql-connector-python>=8.0.29",
Expand Down Expand Up @@ -1085,6 +1086,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4"
],
"devel-deps": [],
Expand Down Expand Up @@ -1260,6 +1262,7 @@
},
"sqlite": {
"deps": [
"aiosqlite>=0.20.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0"
],
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/mysql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dependencies:
# Instead, if someone attempts to use it on MacOS, they will get explanatory error on how to install it
- mysqlclient>=1.4.0; sys_platform != 'darwin'
- mysql-connector-python>=8.0.29
- aiomysql>=0.2.0

additional-extras:
# only needed for backwards compatibility
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/postgres/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.20.0
- psycopg2-binary>=2.9.4
- asyncpg>=0.30.0

additional-extras:
- name: amazon
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/sqlite/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- aiosqlite>=0.20.0
- apache-airflow-providers-common-sql>=1.20.0

integrations:
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/kubernetes/k8s_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-e .[devel-devscripts,devel-tests,cncf.kubernetes]
-e .[devel-devscripts,devel-tests,cncf.kubernetes,sqlite]
-e ./providers
-e ./task_sdk
12 changes: 12 additions & 0 deletions tests/utils/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from __future__ import annotations

import pytest
from sqlalchemy import select

from airflow.models import Log
from airflow.utils.session import provide_session

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -53,3 +55,13 @@ def test_provide_session_with_kwargs(self):

session = object()
assert wrapper(session=session) is session

@pytest.mark.asyncio
async def test_async_session(self):
from airflow.settings import create_async_session

session = create_async_session()
session.add(Log(event="hihi1234"))
await session.commit()
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
dstandish marked this conversation as resolved.
Show resolved Hide resolved
assert l.event == "hihi1234"
dstandish marked this conversation as resolved.
Show resolved Hide resolved