From 67e93901c15fbd4eed28ef4ce883c5877598ec58 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 11:07:34 +0000 Subject: [PATCH 1/6] fix: FeastModuleImportError Signed-off-by: derek1032 --- .../infra/offline_stores/contrib/athena_repo_configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py index 32376eb652..f5e78ffb90 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py @@ -1,7 +1,7 @@ from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) -from tests.integration.feature_repos.universal.data_sources.athena import ( +from feast.infra.offline_stores.contrib.athena_offline_store.tests.data_source import ( AthenaDataSourceCreator, ) From 19828fcf54ffa8296fcff9eae6916fef6fbeafd6 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 11:27:29 +0000 Subject: [PATCH 2/6] chore: Add aws region for athena universal tests Signed-off-by: derek1032 --- Makefile | 8 +++++--- .../athena_offline_store/tests/data_source.py | 16 +++++++++++----- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 8e03ed5349..bce14255f8 100644 --- a/Makefile +++ b/Makefile @@ -159,14 +159,16 @@ test-python-universal-mssql: sdk/python/tests -#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html -#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use. -#If tests fail with the pytest -n 8 option, change the number to 1. +# To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. +# https://docs.aws.amazon.com/athena/latest/ug/getting-started.html +# Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME +# according to your needs. If tests fail with the pytest -n 8 option, change the number to 1. test-python-universal-athena: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \ PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \ FEAST_USAGE=False IS_TEST=True \ + ATHENA_REGION=ap-northeast-2 \ ATHENA_DATA_SOURCE=AwsDataCatalog \ ATHENA_DATABASE=default \ ATHENA_S3_BUCKET_NAME=feast-integration-tests \ diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index 92e0d6e5f6..2a5cabc2d1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -27,8 +27,11 @@ class AthenaDataSourceCreator(DataSourceCreator): def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) - self.client = aws_utils.get_athena_data_client("ap-northeast-2") - self.s3 = aws_utils.get_s3_resource("ap-northeast-2") + region = ( + os.environ.get("ATHENA_REGION") + if os.environ.get("ATHENA_REGION") + else "ap-northeast-2" + ) data_source = ( os.environ.get("ATHENA_DATA_SOURCE") if os.environ.get("ATHENA_DATA_SOURCE") @@ -44,10 +47,13 @@ def __init__(self, project_name: str, *args, **kwargs): if os.environ.get("ATHENA_S3_BUCKET_NAME") else "feast-integration-tests" ) + + self.client = aws_utils.get_athena_data_client(region) + self.s3 = aws_utils.get_s3_resource(region) self.offline_store_config = AthenaOfflineStoreConfig( - data_source=f"{data_source}", - region="ap-northeast-2", - database=f"{database}", + data_source=data_source, + region=region, + database=database, s3_staging_location=f"s3://{bucket_name}/test_dir", ) From cae3f2ed3ceefa909b71db61d6acbce46f259d20 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 11:41:26 +0000 Subject: [PATCH 3/6] feat: Add workgroup attribute for Athena offline store config Signed-off-by: derek1032 --- Makefile | 5 +- .../contrib/athena_offline_store/athena.py | 11 +++ .../athena_offline_store/athena_source.py | 1 + .../athena_offline_store/tests/data_source.py | 8 +++ sdk/python/feast/infra/utils/aws_utils.py | 67 ++++++++++++------- 5 files changed, 67 insertions(+), 25 deletions(-) diff --git a/Makefile b/Makefile index bce14255f8..990604da57 100644 --- a/Makefile +++ b/Makefile @@ -161,8 +161,8 @@ test-python-universal-mssql: # To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. # https://docs.aws.amazon.com/athena/latest/ug/getting-started.html -# Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME -# according to your needs. If tests fail with the pytest -n 8 option, change the number to 1. +# Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_WORKGROUP or +# ATHENA_S3_BUCKET_NAME according to your needs. If tests fail with the pytest -n 8 option, change the number to 1. test-python-universal-athena: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \ @@ -171,6 +171,7 @@ test-python-universal-athena: ATHENA_REGION=ap-northeast-2 \ ATHENA_DATA_SOURCE=AwsDataCatalog \ ATHENA_DATABASE=default \ + ATHENA_WORKGROUP=primary \ ATHENA_S3_BUCKET_NAME=feast-integration-tests \ python -m pytest -n 8 --integration \ -k "not test_go_feature_server and \ diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 5095a43d57..ad3aae3bf9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -59,6 +59,9 @@ class AthenaOfflineStoreConfig(FeastConfigBaseModel): database: StrictStr """ Athena database name """ + + workgroup: StrictStr + """ Athena workgroup name """ s3_staging_location: StrictStr """ S3 path for importing & exporting data to Athena """ @@ -243,6 +246,7 @@ def query_generator() -> Iterator[str]: athena_client, config.offline_store.data_source, config.offline_store.database, + config.offline_store.workgroup, f"DROP TABLE IF EXISTS {config.offline_store.database}.{table_name}", ) @@ -293,6 +297,7 @@ def write_logged_features( athena_client=athena_client, data_source=config.offline_store.data_source, database=config.offline_store.database, + workgroup=config.offline_store.workgroup, s3_resource=s3_resource, s3_path=s3_path, table_name=destination.table_name, @@ -378,6 +383,7 @@ def _to_df_internal(self) -> pd.DataFrame: self._athena_client, self._config.offline_store.data_source, self._config.offline_store.database, + self._config.offline_store.workgroup, self._s3_resource, temp_external_location, self.get_temp_table_dml_header(temp_table_name, temp_external_location) @@ -394,6 +400,7 @@ def _to_arrow_internal(self) -> pa.Table: self._athena_client, self._config.offline_store.data_source, self._config.offline_store.database, + self._config.offline_store.workgroup, self._s3_resource, temp_external_location, self.get_temp_table_dml_header(temp_table_name, temp_external_location) @@ -432,6 +439,7 @@ def to_athena(self, table_name: str) -> None: self._athena_client, self._config.offline_store.data_source, self._config.offline_store.database, + self._config.offline_store.workgroup, query, ) @@ -449,6 +457,7 @@ def _upload_entity_df( athena_client, config.offline_store.data_source, config.offline_store.database, + config.offline_store.workgroup, s3_resource, f"{config.offline_store.s3_staging_location}/entity_df/{table_name}/{table_name}.parquet", table_name, @@ -460,6 +469,7 @@ def _upload_entity_df( athena_client, config.offline_store.data_source, config.offline_store.database, + config.offline_store.workgroup, f"CREATE TABLE {table_name} AS ({entity_df})", ) else: @@ -514,6 +524,7 @@ def _get_entity_df_event_timestamp_range( athena_client, config.offline_store.data_source, config.offline_store.database, + config.offline_store.workgroup, f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max " f"FROM ({entity_df})", ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py index bac027ff3e..8e9e3893f3 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py @@ -225,6 +225,7 @@ def get_table_column_names_and_types( client, config.offline_store.data_source, config.offline_store.database, + config.offline_store.workgroup, f"SELECT * FROM ({self.query}) LIMIT 1", ) columns = aws_utils.get_athena_query_result(client, statement_id)[ diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index 2a5cabc2d1..ce68e6eee1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -42,6 +42,11 @@ def __init__(self, project_name: str, *args, **kwargs): if os.environ.get("ATHENA_DATABASE") else "default" ) + workgroup = ( + os.environ.get("ATHENA_WORKGROUP") + if os.environ.get("ATHENA_WORKGROUP") + else "primary" + ) bucket_name = ( os.environ.get("ATHENA_S3_BUCKET_NAME") if os.environ.get("ATHENA_S3_BUCKET_NAME") @@ -54,6 +59,7 @@ def __init__(self, project_name: str, *args, **kwargs): data_source=data_source, region=region, database=database, + workgroup=workgroup, s3_staging_location=f"s3://{bucket_name}/test_dir", ) @@ -83,6 +89,7 @@ def create_data_source( self.client, self.offline_store_config.data_source, self.offline_store_config.database, + self.offline_store_config.workgroup, self.s3, s3_target, table_name, @@ -132,5 +139,6 @@ def teardown(self): self.client, self.offline_store_config.data_source, self.offline_store_config.database, + self.offline_store_config.workgroup, f"DROP TABLE IF EXISTS {table}", ) diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 07ce3ab17d..f3e39506e4 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -677,7 +677,7 @@ def list_s3_files(aws_region: str, path: str) -> List[str]: return files -# Athena +# Athena utils def get_athena_data_client(aws_region: str): @@ -696,16 +696,17 @@ def get_athena_data_client(aws_region: str): reraise=True, ) def execute_athena_query_async( - athena_data_client, data_source: str, database: str, query: str + athena_data_client, data_source: str, database: str, workgroup: str, query: str ) -> dict: """Execute Athena statement asynchronously. Does not wait for the query to finish. Raises AthenaCredentialsError if the statement couldn't be executed due to the validation error. Args: - athena_data_client: athena Data API Service client - data_source: athena Cluster Identifier - database: athena Database Name + athena_data_client: Athena Data API Service client + data_source: Athena Data Source + database: Athena Database Name + workgroup: Athena Workgroup Name query: The SQL query to execute Returns: JSON response @@ -716,7 +717,7 @@ def execute_athena_query_async( return athena_data_client.start_query_execution( QueryString=query, QueryExecutionContext={"Database": database}, - WorkGroup="primary", + WorkGroup=workgroup, ) except ClientError as e: @@ -755,16 +756,17 @@ def wait_for_athena_execution(athena_data_client, execution: dict) -> None: def drop_temp_table( - athena_data_client, data_source: str, database: str, temp_table: str + athena_data_client, data_source: str, database: str, workgroup: str, temp_table: str ): query = f"DROP TABLE `{database}.{temp_table}`" - execute_athena_query_async(athena_data_client, data_source, database, query) + execute_athena_query_async(athena_data_client, data_source, database, workgroup, query) def execute_athena_query( athena_data_client, data_source: str, database: str, + workgroup: str, query: str, temp_table: str = None, ) -> str: @@ -775,22 +777,23 @@ def execute_athena_query( Args: - athena_data_client: athena Data API Service client - data_source: athena data source Name - database: athena Database Name + athena_data_client: Athena Data API Service client + data_source: Athena Data Source Name + database: Athena Database Name + workgroup: Athena Workgroup Name query: The SQL query to execute - temp_table: temp table name to be deleted after query execution. + temp_table: Temp table name to be deleted after query execution. Returns: Statement ID """ execution = execute_athena_query_async( - athena_data_client, data_source, database, query + athena_data_client, data_source, database, workgroup, query ) wait_for_athena_execution(athena_data_client, execution) if temp_table is not None: - drop_temp_table(athena_data_client, data_source, database, temp_table) + drop_temp_table(athena_data_client, data_source, database, workgroup, temp_table) return execution["QueryExecutionId"] @@ -822,6 +825,7 @@ def unload_athena_query_to_pa( athena_data_client, data_source: str, database: str, + workgroup: str, s3_resource, s3_path: str, query: str, @@ -831,7 +835,7 @@ def unload_athena_query_to_pa( bucket, key = get_bucket_and_key(s3_path) execute_athena_query_and_unload_to_s3( - athena_data_client, data_source, database, query, temp_table + athena_data_client, data_source, database, workgroup, query, temp_table ) with tempfile.TemporaryDirectory() as temp_dir: @@ -844,6 +848,7 @@ def unload_athena_query_to_df( athena_data_client, data_source: str, database: str, + workgroup: str, s3_resource, s3_path: str, query: str, @@ -854,6 +859,7 @@ def unload_athena_query_to_df( athena_data_client, data_source, database, + workgroup, s3_resource, s3_path, query, @@ -866,6 +872,7 @@ def execute_athena_query_and_unload_to_s3( athena_data_client, data_source: str, database: str, + workgroup: str, query: str, temp_table: str, ) -> None: @@ -873,20 +880,29 @@ def execute_athena_query_and_unload_to_s3( Args: athena_data_client: Athena Data API Service client - data_source: Athena data source - database: Redshift Database Name + data_source: Athena Data Source + database: Athena Database Name + workgroup: Athena Workgroup Name query: The SQL query to execute temp_table: temp table name to be deleted after query execution. """ - execute_athena_query(athena_data_client, data_source, database, query, temp_table) + execute_athena_query( + athena_data_client=athena_data_client, + data_source=data_source, + database=database, + workgroup=workgroup, + query=query, + temp_table=temp_table, + ) def upload_df_to_athena( athena_client, data_source: str, database: str, + workgroup: str, s3_resource, s3_path: str, table_name: str, @@ -900,6 +916,7 @@ def upload_df_to_athena( athena_client: Athena API Service client data_source: Athena Data Source database: Athena Database Name + workgroup: Athena Workgroup Name s3_resource: S3 Resource object s3_path: S3 path where the Parquet file is temporarily uploaded table_name: The name of the new Data Catalog table where we copy the dataframe @@ -924,6 +941,7 @@ def upload_df_to_athena( athena_client, data_source=data_source, database=database, + workgroup=workgroup, s3_resource=s3_resource, s3_path=s3_path, table_name=table_name, @@ -935,6 +953,7 @@ def upload_arrow_table_to_athena( athena_client, data_source: str, database: str, + workgroup: str, s3_resource, s3_path: str, table_name: str, @@ -952,8 +971,9 @@ def upload_arrow_table_to_athena( Args: table: The Arrow Table or Path to parquet dataset to upload athena_client: Athena API Service client - data_source: Athena data source + data_source: Athena Data Source database: Athena Database Name + workgroup: Athena Workgroup Name s3_resource: S3 Resource object s3_path: S3 path where the Parquet file is temporarily uploaded table_name: The name of the new Athena table where we copy the dataframe @@ -995,10 +1015,11 @@ def upload_arrow_table_to_athena( try: execute_athena_query( - athena_client, - data_source, - database, - f"{create_query}", + athena_data_client=athena_client, + data_source=data_source, + database=database, + workgroup=workgroup, + query=f"{create_query}", ) finally: pass From 09077dfb2a30f21c891270d22b276611a2b17d66 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 11:45:58 +0000 Subject: [PATCH 4/6] fix: Add fail_if_exists condition to query string Signed-off-by: derek1032 --- sdk/python/feast/infra/utils/aws_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index f3e39506e4..43cbd45d04 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -1006,7 +1006,7 @@ def upload_arrow_table_to_athena( s3_resource.Object(bucket, key).put(Body=parquet_temp_file) create_query = ( - f"CREATE EXTERNAL TABLE {database}.{table_name} " + f"CREATE EXTERNAL TABLE {database}.{table_name} {'IF NOT EXISTS' if not fail_if_exists else ''}" f"({column_query_list}) " f"STORED AS PARQUET " f"LOCATION '{s3_path[:s3_path.rfind('/')]}' " From f8e980230e1cb0feb944e844749a82ef6d415717 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 15:46:03 +0000 Subject: [PATCH 5/6] chore: Format python codes Signed-off-by: derek1032 --- .../contrib/athena_offline_store/athena.py | 2 +- .../athena_offline_store/tests/data_source.py | 2 +- .../contrib/athena_repo_configuration.py | 6 +++--- sdk/python/feast/infra/utils/aws_utils.py | 20 +++++++++++-------- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index ad3aae3bf9..e3bb4e8cca 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -59,7 +59,7 @@ class AthenaOfflineStoreConfig(FeastConfigBaseModel): database: StrictStr """ Athena database name """ - + workgroup: StrictStr """ Athena workgroup name """ diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index ce68e6eee1..87d0afc622 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -52,7 +52,7 @@ def __init__(self, project_name: str, *args, **kwargs): if os.environ.get("ATHENA_S3_BUCKET_NAME") else "feast-integration-tests" ) - + self.client = aws_utils.get_athena_data_client(region) self.s3 = aws_utils.get_s3_resource(region) self.offline_store_config = AthenaOfflineStoreConfig( diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py index f5e78ffb90..09bc6ce961 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py @@ -1,9 +1,9 @@ -from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, -) from feast.infra.offline_stores.contrib.athena_offline_store.tests.data_source import ( AthenaDataSourceCreator, ) +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig( diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 43cbd45d04..7e8335ac92 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -759,7 +759,9 @@ def drop_temp_table( athena_data_client, data_source: str, database: str, workgroup: str, temp_table: str ): query = f"DROP TABLE `{database}.{temp_table}`" - execute_athena_query_async(athena_data_client, data_source, database, workgroup, query) + execute_athena_query_async( + athena_data_client, data_source, database, workgroup, query + ) def execute_athena_query( @@ -793,7 +795,9 @@ def execute_athena_query( ) wait_for_athena_execution(athena_data_client, execution) if temp_table is not None: - drop_temp_table(athena_data_client, data_source, database, workgroup, temp_table) + drop_temp_table( + athena_data_client, data_source, database, workgroup, temp_table + ) return execution["QueryExecutionId"] @@ -889,13 +893,13 @@ def execute_athena_query_and_unload_to_s3( """ execute_athena_query( - athena_data_client=athena_data_client, - data_source=data_source, - database=database, - workgroup=workgroup, - query=query, + athena_data_client=athena_data_client, + data_source=data_source, + database=database, + workgroup=workgroup, + query=query, temp_table=temp_table, - ) + ) def upload_df_to_athena( From aca260696d02f2db24619bbd806662d2bc32ba58 Mon Sep 17 00:00:00 2001 From: derek1032 Date: Thu, 25 Aug 2022 16:00:50 +0000 Subject: [PATCH 6/6] chore: use os.getenv and default value Signed-off-by: derek1032 --- .../athena_offline_store/tests/data_source.py | 31 ++++--------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index 87d0afc622..384ab69e81 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -27,31 +27,12 @@ class AthenaDataSourceCreator(DataSourceCreator): def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) - region = ( - os.environ.get("ATHENA_REGION") - if os.environ.get("ATHENA_REGION") - else "ap-northeast-2" - ) - data_source = ( - os.environ.get("ATHENA_DATA_SOURCE") - if os.environ.get("ATHENA_DATA_SOURCE") - else "AwsDataCatalog" - ) - database = ( - os.environ.get("ATHENA_DATABASE") - if os.environ.get("ATHENA_DATABASE") - else "default" - ) - workgroup = ( - os.environ.get("ATHENA_WORKGROUP") - if os.environ.get("ATHENA_WORKGROUP") - else "primary" - ) - bucket_name = ( - os.environ.get("ATHENA_S3_BUCKET_NAME") - if os.environ.get("ATHENA_S3_BUCKET_NAME") - else "feast-integration-tests" - ) + + region = os.getenv("ATHENA_REGION", "ap-northeast-2") + data_source = os.getenv("ATHENA_DATA_SOURCE", "AwsDataCatalog") + database = os.getenv("ATHENA_DATABASE", "default") + workgroup = os.getenv("ATHENA_WORKGROUP", "primary") + bucket_name = os.getenv("ATHENA_S3_BUCKET_NAME", "feast-integration-tests") self.client = aws_utils.get_athena_data_client(region) self.s3 = aws_utils.get_s3_resource(region)