Skip to content

Commit

Permalink
feat: Feast AWS Athena offline store (again) (#3044)
Browse files Browse the repository at this point in the history
* fixed bugs, cleaned code, added AthenaDataSourceCreator

Signed-off-by: Youngkyu OH <[email protected]>

* fixed bugs, cleaned code, added some methods. test_universal_historical_retrieval - 100% passed

Signed-off-by: Youngkyu OH <[email protected]>

* fixed bugs to pass test_validation

Signed-off-by: Youngkyu OH <[email protected]>

* changed boolean data type mapping

Signed-off-by: Youngkyu OH <[email protected]>

* 1.added test-python-universal-athena in Makefile  2.replaced database,bucket_name hardcoding to variable in AthenaDataSourceCreator

Signed-off-by: Youngkyu OH <[email protected]>

* format,run lint

Signed-off-by: Youngkyu OH <[email protected]>

* revert merge changes

Signed-off-by: Danny Chiao <[email protected]>

* add entity_key_serialization

Signed-off-by: Danny Chiao <[email protected]>

* restore deleted file

Signed-off-by: Danny Chiao <[email protected]>

* modified confusing environment variable names, added how to use Athena

Signed-off-by: Youngkyu OH <[email protected]>

* enforce AthenaSource to have a name

Signed-off-by: Youngkyu OH <[email protected]>

Co-authored-by: toping4445 <[email protected]>
Co-authored-by: Danny Chiao <[email protected]>
  • Loading branch information
3 people authored Aug 10, 2022
1 parent 32d2039 commit 989ce08
Show file tree
Hide file tree
Showing 20 changed files with 1,782 additions and 2 deletions.
27 changes: 27 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,33 @@ test-python-universal-trino:
not test_universal_types" \
sdk/python/tests

#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use.
#If tests fail with the pytest -n 8 option, change the number to 1.
test-python-universal-athena:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
ATHENA_DATA_SOURCE=AwsDataCatalog \
ATHENA_DATABASE=default \
ATHENA_S3_BUCKET_NAME=feast-integration-tests \
python -m pytest -n 8 --integration \
-k "not test_go_feature_server and \
not test_logged_features_validation and \
not test_lambda and \
not test_feature_logging and \
not test_offline_write and \
not test_push_offline and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not gcs_registry and \
not s3_registry" \
sdk/python/tests



test-python-universal-postgres:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \
Expand Down
18 changes: 18 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message DataSource {
PUSH_SOURCE = 9;
BATCH_TRINO = 10;
BATCH_SPARK = 11;
BATCH_ATHENA = 12;
}

// Unique name of data source within the project
Expand Down Expand Up @@ -171,6 +172,22 @@ message DataSource {
string database = 4;
}

// Defines options for DataSource that sources features from a Athena Query
message AthenaOptions {
// Athena table name
string table = 1;

// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
// entity columns
string query = 2;

// Athena database name
string database = 3;

// Athena schema name
string data_source = 4;
}

// Defines options for DataSource that sources features from a Snowflake Query
message SnowflakeOptions {
// Snowflake table name
Expand Down Expand Up @@ -242,5 +259,6 @@ message DataSource {
PushOptions push_options = 22;
SparkOptions spark_options = 27;
TrinoOptions trino_options = 30;
AthenaOptions athena_options = 35;
}
}
6 changes: 6 additions & 0 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ message LoggingConfig {
RedshiftDestination redshift_destination = 5;
SnowflakeDestination snowflake_destination = 6;
CustomDestination custom_destination = 7;
AthenaDestination athena_destination = 8;
}

message FileDestination {
Expand All @@ -80,6 +81,11 @@ message LoggingConfig {
string table_name = 1;
}

message AthenaDestination {
// Destination table name. data_source and database will be taken from an offline store config
string table_name = 1;
}

message SnowflakeDestination {
// Destination table name. Schema and database will be taken from an offline store config
string table_name = 1;
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message SavedDatasetStorage {
DataSource.TrinoOptions trino_storage = 8;
DataSource.SparkOptions spark_storage = 9;
DataSource.CustomSourceOptions custom_storage = 10;
DataSource.AthenaOptions athena_storage = 11;
}
}

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from importlib_metadata import PackageNotFoundError, version as _version # type: ignore

from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import (
AthenaSource,
)
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
Expand Down Expand Up @@ -50,4 +53,5 @@
"SnowflakeSource",
"PushSource",
"RequestSource",
"AthenaSource",
]
1 change: 1 addition & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"SnowflakeSource",
"SparkSource",
"TrinoSource",
"AthenaSource",
}


Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoSource",
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource",
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena_source.AthenaSource",
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestSource",
Expand Down Expand Up @@ -183,6 +184,7 @@ class DataSource(ABC):
maintainer.
timestamp_field (optional): Event timestamp field used for point in time
joins of feature values.
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all offline stores.
"""

name: str
Expand All @@ -192,6 +194,7 @@ class DataSource(ABC):
description: str
tags: Dict[str, str]
owner: str
date_partition_column: str

def __init__(
self,
Expand All @@ -203,6 +206,7 @@ def __init__(
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
date_partition_column: Optional[str] = None,
):
"""
Creates a DataSource object.
Expand All @@ -220,6 +224,7 @@ def __init__(
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all stores
"""
self.name = name
self.timestamp_field = timestamp_field or ""
Expand All @@ -237,6 +242,9 @@ def __init__(
self.description = description or ""
self.tags = tags or {}
self.owner = owner or ""
self.date_partition_column = (
date_partition_column if date_partition_column else ""
)

def __hash__(self):
return hash((self.name, self.timestamp_field))
Expand All @@ -256,6 +264,7 @@ def __eq__(self, other):
or self.timestamp_field != other.timestamp_field
or self.created_timestamp_column != other.created_timestamp_column
or self.field_mapping != other.field_mapping
or self.date_partition_column != other.date_partition_column
or self.description != other.description
or self.tags != other.tags
or self.owner != other.owner
Expand Down
Empty file.
Loading

0 comments on commit 989ce08

Please sign in to comment.