Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add OpenLineage support to S3ToRedshiftOperator #41422

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
a1ba7da
add OpenLineage support to S3ToRedshiftOperator
Artuz37 Aug 9, 2024
2855d9a
add tests
Artuz37 Aug 13, 2024
ce8990f
Update airflow/providers/amazon/aws/transfers/s3_to_redshift.py
Artuz37 Aug 13, 2024
b5813cf
rename open_lineage.py to openlineage.py
Artuz37 Aug 14, 2024
4d82e4d
remove isinstance occurences for hook type checking in s3ToRedshiftOp…
Artuz37 Aug 14, 2024
cdbfe82
add tests to aws/utils/openlineage.py
Artuz37 Aug 16, 2024
feb51f0
Move wrongly placed newsfragment (#41435)
vincbeck Aug 13, 2024
2bf4d20
Fix tests/models/test_variable.py for database isolation mode (#41414)
jscheffl Aug 13, 2024
eee6c1a
Remove deprecated SubDags (#41390)
kaxil Aug 13, 2024
c0b35e8
Rename @span decorator to @add_span to avoid collisions (#41444)
dstandish Aug 13, 2024
6d299a6
Skipping tests for db isolation because similar tests were skipped (#…
bugraoz93 Aug 13, 2024
8801d11
Issue-41243 Fix the Exception name and unpin dependency (#41256)
vikramaditya91 Aug 13, 2024
b927676
ydb provider: add database to table name in bulk upsert, use bulk ups…
uzhastik Aug 14, 2024
ba5ad83
Send context using in venv operator (#41363)
phi-friday Aug 14, 2024
0d4132e
Properly implement termination grace period seconds (#41374)
eakmanrq Aug 14, 2024
b06e8dc
Temporarily disable doc publishing waiting for ASF self-hosted runne…
potiuk Aug 14, 2024
058db4d
Remove deprecated and unused methods / properties on DAG (#41440)
dstandish Aug 14, 2024
8c00fbb
Remove support for deprecated imports like operators/hooks/sensors (#…
jedcunningham Aug 14, 2024
1eab95d
Support templated hostname in NOTES (#41423)
romsharon98 Aug 14, 2024
d34adf5
Check breaking changes have a news fragment (#41443)
uranusjr Aug 14, 2024
65914e2
Rephrase max_tis_per_query config docs (#41448)
RNHTTR Aug 14, 2024
817bb92
Add 2.10.0rc1 to issue template (#41459)
ephraimbuddy Aug 14, 2024
e4a30a0
Update Airflow version to `3.0.0.dev0` (#41456)
jedcunningham Aug 14, 2024
9378fe2
fix: rm deprecated import (#41461)
phi-friday Aug 14, 2024
61d6d3d
Allow 3.x Docker images (#41466)
uranusjr Aug 14, 2024
706e515
Describe behaviour in docstring correctly (#41458)
BasPH Aug 14, 2024
54ea557
feat(docker): Replace `use_dill` with `serializer` (#41356)
phi-friday Aug 14, 2024
4465c08
Export Azure Container Instance log messages to XCOM (#41142)
perry2of5 Aug 14, 2024
90fe944
Deprecate implicit default DAG schedule (#41321)
uranusjr Aug 14, 2024
0ade7a9
Enable pull requests to be run from v*test branches (#41474)
potiuk Aug 14, 2024
7fd3eaf
Microsoft Power BI operator to refresh the dataset (#40356)
ambika-garg Aug 14, 2024
bb04a8b
Prevent provider lowest-dependency tests to run in non-main branch (#…
potiuk Aug 14, 2024
d9fd909
Skip database isolation case for task mapping taskinstance tests (#4…
potiuk Aug 14, 2024
653629b
Make PROD image building works in non-main PRs (#41480)
potiuk Aug 14, 2024
26fd799
Bump axios from 1.6.1 to 1.7.4 in /airflow/www (#41451)
dependabot[bot] Aug 14, 2024
22d05c1
Field Deletion Warning when editing Connections (#41144)
lh5844 Aug 14, 2024
d5a1bd3
Fix news fragment check conditional (#41491)
uranusjr Aug 15, 2024
2d67255
Fix try selector refresh (#41483)
bbovenzi Aug 15, 2024
06f5af1
Fix: Pass hook parameters to SnowflakeSqlApiHook and prep them for AP…
BTeclaw Aug 15, 2024
b218c8f
Fix Kinesis Analytics test (#41489)
ferruzzi Aug 15, 2024
4ea24cd
fix: get task dependencies without serializing task tree to string (#…
mobuchowski Aug 15, 2024
1f5210a
Mark TestSparkKubernetes test as db test (#41500)
potiuk Aug 15, 2024
da73dbb
Fix Non-DB test calculation for main builds (#41499)
potiuk Aug 15, 2024
037e3bc
Remove deprecated code is AWS provider (#41407)
vincbeck Aug 15, 2024
56fb450
Add incremental export and cross account export functionality in `Dyn…
Ghoul-SSZ Aug 15, 2024
ae2048b
Fix variable interpolation in Airflow core release guide (#41510)
jedcunningham Aug 15, 2024
47e5d34
fix: docker decorator pyi signature (#41509)
benbenbang Aug 15, 2024
a2640ea
Partial fix for example_dynamodb_to_s3.py (#41517)
ferruzzi Aug 16, 2024
2fdc824
Fixing user_confirmation for quit action for prepare-provider-documen…
amoghrajesh Aug 16, 2024
8a0f273
Update version_added field for config that are released in 2.10 (#41530)
utkarsharma2 Aug 16, 2024
3555777
Removed deprecated param from local_filesystem (#41533)
gopidesupavan Aug 16, 2024
1571b2c
Fix failing pydantic v1 tests (#41534)
potiuk Aug 16, 2024
9203881
Incorrect try number subtraction producing invalid span id for OTEL a…
howardyoo Aug 16, 2024
e19a754
Util helper deprecated functions removal (#41520)
dirrao Aug 16, 2024
0b709b4
Airflow 3: airflow date utils date_range and days_ago deprecations re…
dirrao Aug 16, 2024
3569838
Delete experimental API (#41434)
vincbeck Aug 16, 2024
e27912d
Refactor DataprocCreateBatchOperator and Dataproc system tests (#41527)
moiseenkov Aug 16, 2024
ced5d36
Airflow 3: store_serialized_dags deprecated parameter usage removal (…
dirrao Aug 16, 2024
6fb650b
Skip task in example_dynamodb_to_s3.py (#41546)
ferruzzi Aug 16, 2024
3ed47e7
Fix `ElasticsearchSQLHook` fails with `AttributeError: __enter__` (#4…
Owen-CH-Leung Aug 17, 2024
bc93fb8
Fix `AwsTaskLogFetcher` missing logs (#41515)
vincbeck Aug 17, 2024
9f3524f
restore python 3.12 support for papermill (#41548)
morokosi Aug 17, 2024
30a9781
Chart: Default airflow version to 2.10.0 (#41529)
utkarsharma2 Aug 17, 2024
012f878
Fix changelog template for new providers w/o relevant commits (#41566)
jscheffl Aug 18, 2024
e24259c
Support pre-versions in version handling (#41565)
jscheffl Aug 18, 2024
29391ee
airflow deprecated settings session_lifetime_days, force_log_out_afte…
dirrao Aug 18, 2024
2e56935
smtp email user and password deprecated config removal (#41539)
dirrao Aug 18, 2024
bed7b14
base excutor deprecated unused validate_command function removal (#41…
dirrao Aug 18, 2024
0bec0ee
Remove debian bullseye support (#41568)
potiuk Aug 18, 2024
24bc4ce
Cleanup installed packages when running provider compatibility tests …
potiuk Aug 18, 2024
0426564
Airflow 3: processor_poll_interval deprecated parameter usage removal…
dirrao Aug 18, 2024
7ff7991
Prepare docs for Aug 2nd wave of providers (#41559)
eladkal Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions airflow/providers/amazon/aws/transfers/s3_to_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ def _build_copy_query(
{copy_options};
"""

def _create_hook(self) -> RedshiftDataHook | RedshiftSQLHook:
"""If redshift_data_api_kwargs are provided, create RedshiftDataHook. RedshiftSQLHook otherwise."""
if self.redshift_data_api_kwargs:
return RedshiftDataHook(aws_conn_id=self.redshift_conn_id)
return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)

def execute(self, context: Context) -> None:
if self.method not in AVAILABLE_METHODS:
raise AirflowException(f"Method not found! Available methods: {AVAILABLE_METHODS}")

redshift_hook: RedshiftDataHook | RedshiftSQLHook
if self.redshift_data_api_kwargs:
redshift_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id)
else:
redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
redshift_hook = self._create_hook()
conn = S3Hook.get_connection(conn_id=self.aws_conn_id) if self.aws_conn_id else None
region_info = ""
if conn and conn.extra_dejson.get("region", False):
Expand Down Expand Up @@ -197,3 +199,77 @@ def execute(self, context: Context) -> None:
else:
redshift_hook.run(sql, autocommit=self.autocommit)
self.log.info("COPY command complete...")

def get_openlineage_facets_on_complete(self, task_instance):
"""Implement on_complete as we will query destination table."""
from pathlib import Path

from airflow.providers.amazon.aws.utils.open_lineage import (
get_facets_from_redshift_table,
get_identity_column_lineage_facet,
)
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
Identifier,
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
SymlinksDatasetFacet,
)
from airflow.providers.openlineage.extractors import OperatorLineage

redshift_hook = self._create_hook()
if isinstance(redshift_hook, RedshiftDataHook):
Artuz37 marked this conversation as resolved.
Show resolved Hide resolved
database = self.redshift_data_api_kwargs.get("database")
identifier = self.redshift_data_api_kwargs.get(
"cluster_identifier"
) or self.redshift_data_api_kwargs.get("workgroup_name")
Artuz37 marked this conversation as resolved.
Show resolved Hide resolved
port = self.redshift_data_api_kwargs.get("port", "5439")
authority = f"{identifier}.{redshift_hook.region_name}:{port}"
else:
database = redshift_hook.conn.schema
authority = redshift_hook.get_openlineage_database_info(redshift_hook.conn).authority

output_dataset_facets = get_facets_from_redshift_table(
redshift_hook, self.table, self.redshift_data_api_kwargs, self.schema
)

input_dataset_facets = {}
if not self.column_list:
# If column_list is not specified, then we know that input file matches columns of output table.
input_dataset_facets["schema"] = output_dataset_facets["schema"]

dataset_name = self.s3_key
if "*" in dataset_name:
# If wildcard ("*") is used in s3 path, we want the name of dataset to be directory name,
# but we create a symlink to the full object path with wildcard.
input_dataset_facets["symlink"] = SymlinksDatasetFacet(
identifiers=[Identifier(namespace=f"s3://{self.s3_bucket}", name=dataset_name, type="file")]
)
dataset_name = Path(dataset_name).parent.as_posix()
if dataset_name == ".":
# blob path does not have leading slash, but we need root dataset name to be "/"
dataset_name = "/"

input_dataset = Dataset(
namespace=f"s3://{self.s3_bucket}",
name=dataset_name,
facets=input_dataset_facets,
)

output_dataset_facets["columnLineage"] = get_identity_column_lineage_facet(
field_names=[field.name for field in output_dataset_facets["schema"].fields],
input_datasets=[input_dataset],
)

if self.method == "REPLACE":
output_dataset_facets["lifecycleStateChange"] = LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.OVERWRITE
)

output_dataset = Dataset(
namespace=f"redshift://{authority}",
name=f"{database}.{self.schema}.{self.table}",
facets=output_dataset_facets,
)

return OperatorLineage(inputs=[input_dataset], outputs=[output_dataset])
136 changes: 136 additions & 0 deletions airflow/providers/amazon/aws/utils/open_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.providers.common.compat.openlineage.facet import (
ColumnLineageDatasetFacet,
DocumentationDatasetFacet,
Fields,
InputField,
SchemaDatasetFacet,
SchemaDatasetFacetFields,
)

if TYPE_CHECKING:
from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook


def get_facets_from_redshift_table(
redshift_hook: RedshiftDataHook | RedshiftSQLHook,
table: str,
redshift_data_api_kwargs: dict,
schema: str = "public",
) -> dict[Any, Any]:
"""
Query redshift for table metadata.

SchemaDatasetFacet and DocumentationDatasetFacet (if table has description) will be created.
"""
sql = f"""
SELECT
cols.column_name,
cols.data_type,
col_des.description as column_description,
tbl_des.description as table_description
FROM
information_schema.columns cols
LEFT JOIN
pg_catalog.pg_description col_des
ON
cols.ordinal_position = col_des.objsubid
AND col_des.objoid = (SELECT oid FROM pg_class WHERE relnamespace =
(SELECT oid FROM pg_namespace WHERE nspname = cols.table_schema) AND relname = cols.table_name)
LEFT JOIN
pg_catalog.pg_class tbl
ON
tbl.relname = cols.table_name
AND tbl.relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = cols.table_schema)
LEFT JOIN
pg_catalog.pg_description tbl_des
ON
tbl.oid = tbl_des.objoid
AND tbl_des.objsubid = 0
WHERE
cols.table_name = '{table}'
AND cols.table_schema = '{schema}';
"""
if isinstance(redshift_hook, RedshiftSQLHook):
records = redshift_hook.get_records(sql)
if records:
table_description = records[0][-1] # Assuming the table description is the same for all rows
else:
table_description = None
documentation = DocumentationDatasetFacet(description=table_description or "")
table_schema = SchemaDatasetFacet(
fields=[
SchemaDatasetFacetFields(name=field[0], type=field[1], description=field[2])
for field in records
]
)
else:
statement_id = redshift_hook.execute_query(sql=sql, poll_interval=1, **redshift_data_api_kwargs)
response = redshift_hook.conn.get_statement_result(Id=statement_id)

table_schema = SchemaDatasetFacet(
fields=[
SchemaDatasetFacetFields(
name=field[0]["stringValue"],
type=field[1]["stringValue"],
description=field[2].get("stringValue"),
)
for field in response["Records"]
]
)
# Table description will be the same for all fields, so we retrieve it from first field.
documentation = DocumentationDatasetFacet(
description=response["Records"][0][3].get("stringValue") or ""
)

return {"schema": table_schema, "documentation": documentation}


def get_identity_column_lineage_facet(
field_names,
input_datasets,
) -> ColumnLineageDatasetFacet:
"""
Get column lineage facet.

Simple lineage will be created, where each source column corresponds to single destination column
in each input dataset and there are no transformations made.
"""
if field_names and not input_datasets:
raise ValueError("When providing `field_names` You must provide at least one `input_dataset`.")

column_lineage_facet = ColumnLineageDatasetFacet(
fields={
field: Fields(
inputFields=[
InputField(namespace=dataset.namespace, name=dataset.name, field=field)
for dataset in input_datasets
],
transformationType="IDENTITY",
transformationDescription="identical",
)
for field in field_names
}
)
return column_lineage_facet
Loading