diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json index a682a2d9ca6f..f8417079b2c0 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2", "name": "S3", "dockerRepository": "airbyte/source-s3", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://docs.airbyte.io/integrations/sources/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 435403ac56f9..071cb2ce3fb8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -85,7 +85,7 @@ - sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 name: S3 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 sourceType: file - sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py index c414d3497428..ca7fdd2eb3cb 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py @@ -100,6 +100,9 @@ def pytest_collection_modifyitems(config, items): i += len(inner_items) for items in packed_items: + if not hasattr(items[0].cls, "config_key"): + # Skip user defined test classes from integration_tests/ directory. + continue test_configs = getattr(config.tests, items[0].cls.config_key()) for test_config, item in zip(test_configs, items): default_timeout = item.get_closest_marker("default_timeout") diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index fc6a74b4e12f..0dc3dde2ff6e 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,7 +17,7 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index 9c32b9f7015a..90146466b3eb 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -11,13 +11,18 @@ tests: # for Parquet format - config_path: "secrets/parquet_config.json" status: "succeed" + # for custom server + - config_path: "integration_tests/config_minio.json" + status: "succeed" - config_path: "integration_tests/invalid_config.json" status: "failed" discovery: - # for CSV format + # for CSV format - config_path: "secrets/config.json" # for Parquet format - config_path: "secrets/parquet_config.json" + # for custom server + - config_path: "integration_tests/config_minio.json" basic_read: # for CSV format - config_path: "secrets/config.json" @@ -29,17 +34,32 @@ tests: configured_catalog_path: "integration_tests/parquet_configured_catalog.json" expect_records: path: "integration_tests/parquet_expected_records.txt" - incremental: + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + # expected records contains _ab_source_file_last_modified property which + # is modified all the time s3 file changed and for custom server it is + # file creating date and it always new. Uncomment this line when SAT + # would have ability to ignore specific fields from expected records. + # expect_records: + # path: "integration_tests/expected_records_custom_server.txt.txt" + incremental: # for CSV format - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - cursor_paths: + cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Parquet format - config_path: "secrets/parquet_config.json" configured_catalog_path: "integration_tests/parquet_configured_catalog.json" - cursor_paths: + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" @@ -50,3 +70,6 @@ tests: # for Parquet format - config_path: "secrets/parquet_config.json" configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py index d6cbdc97c495..00310d554984 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py @@ -23,6 +23,11 @@ # +import shutil +import tempfile +from zipfile import ZipFile + +import docker import pytest pytest_plugins = ("source_acceptance_test.plugin",) @@ -32,3 +37,22 @@ def connector_setup(): """ This fixture is a placeholder for external resources that acceptance test might require.""" yield + + +@pytest.fixture(scope="session", autouse=True) +def minio_setup(): + client = docker.from_env() + tmp_dir = tempfile.mkdtemp() + with ZipFile("./integration_tests/minio_data.zip") as archive: + archive.extractall(tmp_dir) + + container = client.containers.run( + "minio/minio", + f"server {tmp_dir}/minio_data", + network_mode="host", + volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"], + detach=True, + ) + yield + shutil.rmtree(tmp_dir) + container.stop() diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json new file mode 100644 index 000000000000..fa6158eeb40e --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -0,0 +1,16 @@ +{ + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-bucket", + "aws_access_key_id": "123456", + "aws_secret_access_key": "123456key", + "path_prefix": "", + "endpoint": "http://localhost:9000" + }, + "format": { + "filetype": "csv" + }, + "path_pattern": "*.csv", + "schema": "{}" +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt new file mode 100644 index 000000000000..ef52db86be11 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt @@ -0,0 +1,55 @@ +{"stream": "test", "data": {"Year": 1960, "Value": 59184116488.9977, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1961, "Value": 49557050182.9631, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1962, "Value": 46685178504.3274, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1963, "Value": 50097303271.0232, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1964, "Value": 59062254890.1871, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1965, "Value": 69709153115.3147, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1966, "Value": 75879434776.1831, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1967, "Value": 72057028559.6741, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1968, "Value": 69993497892.3132, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1969, "Value": 78718820477.9257, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1970, "Value": 91506211306.3745, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1971, "Value": 98562023844.1813, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1972, "Value": 112159813640.376, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1973, "Value": 136769878359.668, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1974, "Value": 142254742077.706, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1975, "Value": 161162492226.686, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1976, "Value": 151627687364.405, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1977, "Value": 172349014326.931, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1978, "Value": 148382111520.192, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1979, "Value": 176856525405.729, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1980, "Value": 189649992463.987, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1981, "Value": 194369049090.197, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1982, "Value": 203549627211.606, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1983, "Value": 228950200773.115, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1984, "Value": 258082147252.256, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1985, "Value": 307479585852.339, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1986, "Value": 298805792971.544, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1987, "Value": 271349773463.863, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1988, "Value": 310722213686.031, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1989, "Value": 345957485871.286, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1990, "Value": 358973230048.399, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1991, "Value": 381454703832.753, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1992, "Value": 424934065934.066, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1993, "Value": 442874596387.119, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1994, "Value": 562261129868.774, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1995, "Value": 732032045217.766, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1996, "Value": 860844098049.121, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1997, "Value": 958159424835.34, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1998, "Value": 1025276902078.73, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1999, "Value": 1089447108705.89, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2000, "Value": 1205260678391.96, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2001, "Value": 1332234719889.82, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2002, "Value": 1461906487857.92, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2003, "Value": 1649928718134.59, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2004, "Value": 1941745602165.09, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2005, "Value": 2268598904116.28, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2006, "Value": 2729784031906.09, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2007, "Value": 3523094314820.9, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2008, "Value": 4558431073438.2, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2009, "Value": 5059419738267.41, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2010, "Value": 6039658508485.59, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2011, "Value": 7492432097810.11, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2012, "Value": 8461623162714.07, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2013, "Value": 9490602600148.49, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2014, "Value": 10354831729340.4, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip b/airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip new file mode 100644 index 000000000000..74f6ba238944 Binary files /dev/null and b/airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip differ diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index bc48673b3d6c..b826eb56aa83 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -116,13 +116,11 @@ "title": "Columns", "description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.", "type": "array", - "items": { - "type": "string" - } + "items": { "type": "string" } }, "batch_size": { "title": "Batch Size", - "description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.", + "description": "Maximum number of records per batch. Batches may be smaller if there aren\u2019t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.", "default": 65536, "type": "integer" } @@ -156,6 +154,22 @@ "description": "By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, we can optimise finding these in S3. This is optional but recommended if your bucket contains many folders/files.", "default": "", "type": "string" + }, + "endpoint": { + "title": "Endpoint", + "description": "Endpoint to an S3 compatible service. Leave empty to use AWS.", + "default": "", + "type": "string" + }, + "use_ssl": { + "title": "Use Ssl", + "description": "Is remote server using secure SSL/TLS connection", + "type": "boolean" + }, + "verify_ssl_cert": { + "title": "Verify Ssl Cert", + "description": "Allow self signed certificates", + "type": "boolean" } }, "required": ["bucket"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py b/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py new file mode 100644 index 000000000000..d94c7feb7b41 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py @@ -0,0 +1,77 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import boto3.session +from botocore.client import Config + +from .source import SourceFilesAbstract + + +def make_s3_resource(provider: dict, session: boto3.session.Session, config: Config = None) -> object: + """ + Construct boto3 resource with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 resource instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + return session.resource("s3", **client_kv_args) + + +def make_s3_client(provider: dict, session: boto3.session.Session = None, config: Config = None) -> object: + """ + Construct boto3 client with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. Default boto3 sesion in case of session not specified. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 client instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + if session is None: + return boto3.client("s3", **client_kv_args) + else: + return session.client("s3", **client_kv_args) + + +def _get_s3_client_args(provider: dict, config: Config) -> dict: + """ + Returns map of args used for creating s3 boto3 client. + :param provider provider configuration from connector configuration. + :param config Client config parameter in case of using creds from .aws/config file. + :return map of s3 client arguments. + """ + client_kv_args = {"config": config} + endpoint = provider.get("endpoint") + if endpoint: + # endpoint could be None or empty string, set to default Amazon endpoint in + # this case. + client_kv_args["endpoint_url"] = endpoint + client_kv_args["use_ssl"] = provider.get("use_ssl") + client_kv_args["verify"] = provider.get("verify_ssl_cert") + + return client_kv_args + + +__all__ = ["SourceFilesAbstract", "make_s3_client", "make_s3_resource"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py index 0bcbc287d2d2..98c0483c37b9 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py @@ -27,13 +27,13 @@ from datetime import datetime from typing import BinaryIO, Iterator, TextIO, Union -import boto3 import smart_open from boto3 import session as boto3session from botocore import UNSIGNED from botocore.client import Config as ClientConfig from botocore.config import Config from botocore.exceptions import NoCredentialsError +from source_s3.s3_utils import make_s3_client, make_s3_resource from .source_files_abstract.storagefile import StorageFile @@ -54,12 +54,10 @@ def _setup_boto_session(self): aws_access_key_id=self._provider.get("aws_access_key_id"), aws_secret_access_key=self._provider.get("aws_secret_access_key"), ) + self._boto_s3_resource = make_s3_resource(self._provider, session=self._boto_session) else: self._boto_session = boto3session.Session() - if self.use_aws_account: - self._boto_s3_resource = self._boto_session.resource("s3") - else: - self._boto_s3_resource = self._boto_session.resource("s3", config=Config(signature_version=UNSIGNED)) + self._boto_s3_resource = make_s3_resource(self._provider, config=Config(signature_version=UNSIGNED), session=self._boto_session) @property def last_modified(self) -> datetime: @@ -79,9 +77,9 @@ def last_modified(self) -> datetime: if self.use_aws_account(self._provider): raise nce else: - return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[ - "LastModified" - ] + return make_s3_client(self._provider, config=ClientConfig(signature_version=UNSIGNED)).head_object( + Bucket=bucket, Key=self.url + )["LastModified"] @staticmethod def use_aws_account(provider: dict) -> bool: @@ -101,12 +99,11 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]: bucket = self._provider.get("bucket") if self.use_aws_account(self._provider): - aws_access_key_id = self._provider.get("aws_access_key_id", "") - aws_secret_access_key = self._provider.get("aws_secret_access_key", "") - result = smart_open.open(f"s3://{aws_access_key_id}:{aws_secret_access_key}@{bucket}/{self.url}", mode=mode) + params = {"client": make_s3_client(self._provider, session=self._boto_session)} + result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) else: config = ClientConfig(signature_version=UNSIGNED) - params = {"client": boto3.client("s3", config=config)} + params = {"client": make_s3_client(self._provider, config=config)} result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) # see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source.py index b4c8ea728769..323ea84fd307 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source.py @@ -56,7 +56,11 @@ class Config: description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, we can optimise finding these in S3. This is optional but recommended if your bucket contains many folders/files.", ) - provider: S3Provider = Field(...) + endpoint: str = Field("", description="Endpoint to an S3 compatible service. Leave empty to use AWS.") + use_ssl: bool = Field(default=None, description="Is remote server using secure SSL/TLS connection") + verify_ssl_cert: bool = Field(default=None, description="Allow self signed certificates") + + provider: S3Provider class SourceS3(SourceFilesAbstract): diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py index c52966eb454b..9db886e0930f 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py @@ -1,27 +1,23 @@ -""" -MIT License - -Copyright (c) 2020 Airbyte - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - -from .source import SourceFilesAbstract - -__all__ = ["SourceFilesAbstract"] +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index e65915b6af9d..784d639d64fb 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -81,7 +81,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> found_a_file = False try: - for filepath in self.stream_class.filepath_iterator(logger, config.get("provider")): + for filepath in self.stream_class(**config).filepath_iterator(): found_a_file = True # TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams # test that matching on the pattern doesn't error diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py index abe4029f408b..df17391f605a 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py @@ -25,7 +25,6 @@ import json import re -from copy import deepcopy from typing import Union from jsonschema import RefResolver @@ -85,11 +84,12 @@ class SourceFilesAbstractSpec(BaseModel): @staticmethod def change_format_to_oneOf(schema: dict) -> dict: - schema["properties"]["format"]["type"] = "object" - if "oneOf" in schema["properties"]["format"]: - return schema - schema["properties"]["format"]["oneOf"] = deepcopy(schema["properties"]["format"]["anyOf"]) - del schema["properties"]["format"]["anyOf"] + props_to_change = ["format"] + for prop in props_to_change: + schema["properties"][prop]["type"] = "object" + if "oneOf" in schema["properties"][prop]: + continue + schema["properties"][prop]["oneOf"] = schema["properties"][prop].pop("anyOf") return schema @staticmethod diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 836c93d055b3..e988f3f2d80d 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -142,16 +142,13 @@ def storagefile_class(self) -> type: :return: reference to relevant class """ - @staticmethod @abstractmethod - def filepath_iterator(logger: AirbyteLogger, provider: dict) -> Iterator[str]: + def filepath_iterator() -> Iterator[str]: """ Provider-specific method to iterate through bucket/container/etc. and yield each full filepath. This should supply the 'url' to use in StorageFile(). This is possibly better described as blob or file path. e.g. for AWS: f"s3://{aws_access_key_id}:{aws_secret_access_key}@{self.url}" <- self.url is what we want to yield here - :param logger: instance of AirbyteLogger to use as this is a staticmethod - :param provider: provider specific mapping as described in spec.json :yield: url filepath to use in StorageFile() """ @@ -185,7 +182,7 @@ def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]: # TODO: don't hardcode max_workers like this with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: - filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator(self.logger, self._provider)) + filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/stream.py index f59134aa40ec..9ed55cf1a067 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/stream.py @@ -23,12 +23,12 @@ # -from typing import Any, Iterator, Mapping +from typing import Iterator -from airbyte_cdk.logger import AirbyteLogger from boto3 import session as boto3session from botocore import UNSIGNED from botocore.config import Config +from source_s3.s3_utils import make_s3_client from .s3file import S3File from .source_files_abstract.stream import IncrementalFileStream @@ -39,23 +39,24 @@ class IncrementalFileStreamS3(IncrementalFileStream): def storagefile_class(self) -> type: return S3File - @staticmethod - def _list_bucket(provider: Mapping[str, Any], accept_key=lambda k: True) -> Iterator[str]: + def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]: """ Wrapper for boto3's list_objects_v2 so we can handle pagination, filter by lambda func and operate with or without credentials - :param provider: provider specific mapping as described in spec.json :param accept_key: lambda function to allow filtering return keys, e.g. lambda k: not k.endswith('/'), defaults to lambda k: True :yield: key (name) of each object """ + provider = self._provider + + client_config = None if S3File.use_aws_account(provider): session = boto3session.Session( aws_access_key_id=provider["aws_access_key_id"], aws_secret_access_key=provider["aws_secret_access_key"] ) - client = session.client("s3") else: session = boto3session.Session() - client = session.client("s3", config=Config(signature_version=UNSIGNED)) + client_config = Config(signature_version=UNSIGNED) + client = make_s3_client(self._provider, config=client_config, session=session) ctoken = None while True: @@ -79,23 +80,18 @@ def _list_bucket(provider: Mapping[str, Any], accept_key=lambda k: True) -> Iter if not ctoken: break - @staticmethod - def filepath_iterator(logger: AirbyteLogger, provider: dict) -> Iterator[str]: + def filepath_iterator(self) -> Iterator[str]: """ See _list_bucket() for logic of interacting with S3 - :param logger: instance of AirbyteLogger to use as this is a staticmethod - :param provider: S3 provider mapping as described in spec.json :yield: url filepath to use in S3File() """ - prefix = provider.get("path_prefix") + prefix = self._provider.get("path_prefix") if prefix is None: prefix = "" - msg = f"Iterating S3 bucket '{provider['bucket']}'" - logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg) + msg = f"Iterating S3 bucket '{self._provider['bucket']}'" + self.logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg) - for blob in IncrementalFileStreamS3._list_bucket( - provider=provider, accept_key=lambda k: not k.endswith("/") # filter out 'folders', we just want actual blobs - ): + for blob in self._list_bucket(accept_key=lambda k: not k.endswith("/")): # filter out 'folders', we just want actual blobs yield blob diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 6decfc026854..fae4e8e09744 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -6,6 +6,7 @@ The S3 source enables syncing of file-based tables with support for multiple fil You can choose if this connector will read only the new/updated files, or all the matching files, every time a sync is run. +Connector allows using either Amazon S3 storage or 3rd party S3 compatible service like Wasabi or custom S3 services set up with minio, leofs, ceph etc. ### Output Schema At this time, this source produces only a single stream (table) for the target files. @@ -156,7 +157,9 @@ For example: - `aws_access_key_id` : one half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `aws_secret_access_key` : other half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `path_prefix` : an optional string that limits the files returned by AWS when listing files to only that those starting with this prefix. This is different to path_pattern as it gets pushed down to the API call made to S3 rather than filtered in Airbyte and it does not accept pattern-style symbols (like wildcards `*`). We recommend using this if your bucket has many folders and files that are unrelated to this stream and all the relevant files will always sit under this chosen prefix. - +- `endpoint` : optional parameter that allow using of non Amazon S3 compatible services. Leave it blank for using default Amazon serivce. +- `use_ssl` : Allows using custom servers that configured to use plain http. Ignored in case of using Amazon service. +- `verify_ssl_cert` : Skip ssl validity check in case of using custom servers with self signed certificates. Ignored in case of using Amazon service. ### File Format Settings The Reader in charge of loading the file format is currently based on [PyArrow](https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html) (Apache Arrow). Note that all files within one stream must adhere to the same read options for every provided format. @@ -195,6 +198,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | | 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | | 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | | 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly |