Skip to content

Commit

Permalink
Add OpenLineage support for PostgresOperator.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed Jun 3, 2023
1 parent 6c9d9cf commit da3f29c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
18 changes: 18 additions & 0 deletions airflow/providers/postgres/hooks/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,21 @@ def _generate_insert_sql(
sql += f"{on_conflict_str} DO NOTHING"

return sql

def get_database_info(self, connection):
from airflow.providers.openlineage.sqlparser import DatabaseInfo

return DatabaseInfo(
scheme=self.get_database_dialect(connection), authority=self._get_authority(connection)
)

def get_database_dialect(self, connection):
"""Method used for SQL parsing. Naively tries to use Connection's conn_type"""
return connection.conn_type

def get_default_schema(self):
"""
Returns default schema specific to database.
See: :class:`~providers.openlineage.utils.sqlparser.SQLParser`
"""
return self.database
8 changes: 4 additions & 4 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"tests/providers/postgres/file.py",
),
{
"affected-providers-list-as-string": "amazon common.sql google postgres",
"affected-providers-list-as-string": "amazon common.sql google openlineage postgres",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"python-versions": "['3.8']",
Expand All @@ -109,7 +109,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"docs-build": "true",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Providers[amazon] "
"API Always Providers[common.sql,postgres] Providers[google]",
"API Always Providers[common.sql,openlineage,postgres] Providers[google]",
},
id="API and providers tests and docs should run",
)
Expand Down Expand Up @@ -163,7 +163,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"tests/providers/postgres/file.py",
),
{
"affected-providers-list-as-string": "amazon common.sql google postgres",
"affected-providers-list-as-string": "amazon common.sql google openlineage postgres",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"python-versions": "['3.8']",
Expand All @@ -176,7 +176,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"run-kubernetes-tests": "true",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Providers[amazon] "
"Always Providers[common.sql,postgres] Providers[google]",
"Always Providers[common.sql,openlineage,postgres] Providers[google]",
},
id="Helm tests, providers (both upstream and downstream),"
"kubernetes tests and docs should run",
Expand Down
3 changes: 2 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@
],
"cross-providers-deps": [
"amazon",
"common.sql"
"common.sql",
"openlineage"
],
"excluded-python-versions": []
},
Expand Down
21 changes: 21 additions & 0 deletions tests/providers/postgres/operators/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,24 @@ def test_runtime_parameter_setting(self):
runtime_parameters={"statement_timeout": "3000ms"},
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_postgres_operator_openlineage(self):
sql = """
CREATE TABLE IF NOT EXISTS test_airflow (
dummy VARCHAR(50)
);
"""
op = PostgresOperator(task_id="basic_postgres", sql=sql, dag=self.dag)

lineage = op.get_openlineage_facets_on_start()
assert len(lineage.inputs) == 0
assert len(lineage.outputs) == 0
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

# OpenLineage provider runs same method on complete by default
lineage_on_complete = op.get_openlineage_facets_on_start()
assert len(lineage_on_complete.inputs) == 0
assert len(lineage_on_complete.outputs) == 1
assert lineage_on_complete.outputs[0].namespace == "postgres://postgres:None"
assert lineage_on_complete.outputs[0].name == "public.test_airflow"
assert "schema" in lineage_on_complete.outputs[0].facets

0 comments on commit da3f29c

Please sign in to comment.