Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement transfer for GCS/S3 to BigQuery #1794

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9055e23
Add the DatabaseDataProvider and SqliteDataProvider
sunank200 Feb 17, 2023
eb42e68
Add properties to methods
sunank200 Feb 17, 2023
56de166
Add openlineage_dataset_uri
sunank200 Feb 17, 2023
ea9ee17
Fix the dependency
sunank200 Feb 17, 2023
b85fbb5
Add SnowflakeDataProvider with example DAG
sunank200 Feb 17, 2023
7296fe9
Fix review comment
rajaths010494 Feb 22, 2023
639ef26
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 22, 2023
f2db0b1
nit: update docstring
rajaths010494 Feb 22, 2023
73d4254
Merge branch 'SnowflakeDatabaseDataProvider' of https://github.com/as…
rajaths010494 Feb 22, 2023
3658eca
add docstring
rajaths010494 Feb 22, 2023
6389d02
transfer implementation for GCS/s3 to BigQuery
rajaths010494 Feb 23, 2023
5cd5002
Add read for bigquery using local file
rajaths010494 Mar 1, 2023
bb9f2bc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 1, 2023
c78d6c7
Address review comment
rajaths010494 Mar 1, 2023
421c8cb
Merge branch 'bigquweydataprovider' of https://github.com/astronomer/…
rajaths010494 Mar 1, 2023
66ff416
Remove commented code
rajaths010494 Mar 1, 2023
bcd23e6
add example dag for sqlite
rajaths010494 Mar 2, 2023
f3fc53a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2023
b873eac
Add basic tests
rajaths010494 Mar 2, 2023
95c9e40
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2023
31bb17b
add tests
rajaths010494 Mar 2, 2023
61f58a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from airflow import DAG

from universal_transfer_operator.constants import TransferMode
from universal_transfer_operator.constants import FileType, TransferMode
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.integrations.fivetran import Connector, Destination, FiveTranOptions, Group
Expand All @@ -30,6 +30,75 @@
),
)

transfer_non_native_s3_to_sqlite = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_sqlite",
source_dataset=File(
path="s3://astro-sdk-test/uto/csv_files/", conn_id="aws_default", filetype=FileType.CSV
),
destination_dataset=Table(name="uto_s3_table", conn_id="sqlite_default"),
)

transfer_non_native_gs_to_sqlite = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_sqlite",
source_dataset=File(
path="gs://uto-test/uto/csv_files/", conn_id="google_cloud_default", filetype=FileType.CSV
),
destination_dataset=Table(name="uto_gs_table", conn_id="sqlite_default"),
)

transfer_non_native_s3_to_snowflake = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_snowflake",
source_dataset=File(
path="s3://astro-sdk-test/uto/csv_files/", conn_id="aws_default", filetype=FileType.CSV
),
destination_dataset=Table(name="uto_s3_table", conn_id="snowflake_default"),
)

transfer_non_native_gs_to_snowflake = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_snowflake",
source_dataset=File(
path="gs://uto-test/uto/csv_files/", conn_id="google_cloud_default", filetype=FileType.CSV
),
destination_dataset=Table(name="uto_gs_table", conn_id="snowflake_default"),
)

transfer_non_native_gs_to_bigquery = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_bigquery",
source_dataset=File(path="gs://uto-test/uto/homes_main.csv", conn_id="google_cloud_default"),
destination_dataset=Table(
name="uto_gs_to_bigquery_table", conn_id="google_cloud_default", metadata=Metadata(schema="astro")
),
)

transfer_non_native_s3_to_bigquery = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_bigquery",
source_dataset=File(
path="s3://astro-sdk-test/uto/csv_files/", conn_id="aws_default", filetype=FileType.CSV
),
destination_dataset=Table(
name="uto_s3_to_bigquery_table", conn_id="google_cloud_default", metadata=Metadata(schema="astro")
),
)

transfer_non_native_bigquery_to_snowflake = UniversalTransferOperator(
task_id="transfer_non_native_bigquery_to_snowflake",
source_dataset=Table(
name="uto_s3_to_bigquery_table", conn_id="google_cloud_default", metadata=Metadata(schema="astro")
),
destination_dataset=Table(
name="uto_bigquery_to_snowflake_table",
conn_id="snowflake_default",
),
)

transfer_non_native_bigquery_to_sqlite = UniversalTransferOperator(
task_id="transfer_non_native_bigquery_to_sqlite",
source_dataset=Table(
name="uto_s3_to_bigquery_table", conn_id="google_cloud_default", metadata=Metadata(schema="astro")
),
destination_dataset=Table(name="uto_bigquery_to_sqlite_table", conn_id="sqlite_default"),
)

transfer_fivetran_with_connector_id = UniversalTransferOperator(
task_id="transfer_fivetran_with_connector_id",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
Expand Down
10 changes: 8 additions & 2 deletions universal_transfer_operator/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ google = [
snowflake = [
"apache-airflow-providers-snowflake",
"snowflake-sqlalchemy>=1.2.0",
"snowflake-connector-python[pandas]",
"snowflake-connector-python[pandas]<3.0.0",
# pinning snowflake-connector-python[pandas]<3.0.0 due to a conflict in snowflake-connector-python/pyarrow/google
# packages and pandas-gbq/google packages which is forcing pandas-gbq of version 0.13.2 installed, which is not
# compatible with pandas 1.5.3
]

amazon = [
Expand All @@ -72,7 +75,10 @@ all = [
"apache-airflow-providers-google>=6.4.0",
"apache-airflow-providers-snowflake",
"smart-open[all]>=5.2.1",
"snowflake-connector-python[pandas]",
"snowflake-connector-python[pandas]<3.0.0",
# pinning snowflake-connector-python[pandas]<3.0.0 due to a conflict in snowflake-connector-python/pyarrow/google
# packages and pandas-gbq/google packages which is forcing pandas-gbq of version 0.13.2 installed, which is not
# compatible with pandas 1.5.3
"snowflake-sqlalchemy>=1.2.0",
"sqlalchemy-bigquery>=1.3.0",
"s3fs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,4 @@ def __repr__(self):
LoadExistStrategy = Literal["replace", "append"]
DEFAULT_CHUNK_SIZE = 1000000
ColumnCapitalization = Literal["upper", "lower", "original"]
DEFAULT_SCHEMA = "tmp_transfers"
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,40 @@
from universal_transfer_operator.constants import TransferMode
from universal_transfer_operator.data_providers.base import DataProviders
from universal_transfer_operator.datasets.base import Dataset
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Table
from universal_transfer_operator.utils import TransferParameters, get_class_name

DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING = dict.fromkeys(
["s3", "aws"], "universal_transfer_operator.data_providers.filesystem.aws.s3"
) | dict.fromkeys(
["gs", "google_cloud_platform"], "universal_transfer_operator.data_providers.filesystem.google.cloud.gcs"
DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING = (
dict.fromkeys(
[("s3", File), ("aws", File)], "universal_transfer_operator.data_providers.filesystem.aws.s3"
)
| dict.fromkeys(
[("gs", Table), ("google_cloud_platform", Table)],
"universal_transfer_operator.data_providers.database.google.bigquery",
)
| dict.fromkeys(
[("gs", File), ("google_cloud_platform", File)],
"universal_transfer_operator.data_providers.filesystem.google.cloud.gcs",
)
| dict.fromkeys(
[
("sqlite", Table),
],
"universal_transfer_operator.data_providers.database.sqlite",
)
| dict.fromkeys(
[
("snowflake", Table),
],
"universal_transfer_operator.data_providers.database.snowflake",
)
| dict.fromkeys(
[
(None, File),
],
"universal_transfer_operator.data_providers.filesystem.local",
)
)


Expand All @@ -19,8 +47,12 @@ def create_dataprovider(
transfer_params: TransferParameters = None,
transfer_mode: TransferMode = TransferMode.NONNATIVE,
) -> DataProviders:
conn_type = BaseHook.get_connection(dataset.conn_id).conn_type
module_path = DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING[conn_type]
print(dataset)
if dataset.conn_id != "":
conn_type = BaseHook.get_connection(dataset.conn_id).conn_type
else:
conn_type = None
module_path = DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING[(conn_type, type(dataset))]
module = importlib.import_module(module_path)
class_name = get_class_name(module_ref=module, suffix="DataProvider")
data_provider: DataProviders = getattr(module, class_name)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,11 @@ def openlineage_dataset_name(self) -> str:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
"""
raise NotImplementedError

@property
def openlineage_dataset_uri(self) -> str:
"""
Returns the open lineage dataset uri as per
https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
"""
raise NotImplementedError
Loading