Skip to content

Commit

Permalink
AWS Template improvements (input prompt for configs, default to Redsh…
Browse files Browse the repository at this point in the history
…ift) (#1731)

* AWS Template improvements (input prompt for configs, default to Redshift)

Signed-off-by: Tsotne Tabidze <[email protected]>

* Add inquirer library to setup.py

Signed-off-by: Tsotne Tabidze <[email protected]>

* Remove inquirer library and fix linter

Signed-off-by: Tsotne Tabidze <[email protected]>

* Fix test_cli_aws.py

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored Jul 28, 2021
1 parent 7972992 commit 95a245a
Show file tree
Hide file tree
Showing 21 changed files with 194 additions and 83 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ def ingest(
>>> client = Client(core_url="localhost:6565")
>>> ft_df = pd.DataFrame(
>>> {
>>> "datetime": [pd.datetime.now()],
>>> "event_timestamp": [pd.datetime.now()],
>>> "driver": [1001],
>>> "rating": [4.3],
>>> }
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ def __new__(cls, name, bases, attrs):
return super().__new__(cls, name, bases, attrs)


#: Default datetime column name for point-in-time join
DATETIME_COLUMN: str = "datetime"

#: Environmental variable to specify Feast configuration file location
FEAST_CONFIG_FILE_ENV: str = "FEAST_CONFIG"

Expand Down
12 changes: 6 additions & 6 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
"""
Example df generated by this function:
| datetime | driver_id | conv_rate | acc_rate | avg_daily_trips | created |
| event_timestamp | driver_id | conv_rate | acc_rate | avg_daily_trips | created |
|------------------+-----------+-----------+----------+-----------------+------------------|
| 2021-03-17 19:31 | 5010 | 0.229297 | 0.685843 | 861 | 2021-03-24 19:34 |
| 2021-03-17 20:31 | 5010 | 0.781655 | 0.861280 | 769 | 2021-03-24 19:34 |
Expand All @@ -107,7 +107,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
"""
df_hourly = pd.DataFrame(
{
"datetime": [
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=start_date, end=end_date, freq="1H", closed="left"
Expand All @@ -129,7 +129,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers])

df_all_drivers.reset_index(drop=True, inplace=True)
rows = df_all_drivers["datetime"].count()
rows = df_all_drivers["event_timestamp"].count()

df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32)
df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32)
Expand All @@ -152,7 +152,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
"""
Example df generated by this function:
| datetime | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created |
| event_timestamp | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created |
|------------------+-------------+-----------------+---------------------+---------------------+------------------|
| 2021-03-17 19:31 | 1010 | 0.889188 | 0.049057 | 412 | 2021-03-24 19:38 |
| 2021-03-18 19:31 | 1010 | 0.979273 | 0.212630 | 639 | 2021-03-24 19:38 |
Expand All @@ -168,7 +168,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
"""
df_daily = pd.DataFrame(
{
"datetime": [
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=start_date, end=end_date, freq="1D", closed="left"
Expand All @@ -185,7 +185,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data

df_all_customers.reset_index(drop=True, inplace=True)

rows = df_all_customers["datetime"].count()
rows = df_all_customers["event_timestamp"].count()

df_all_customers["current_balance"] = np.random.random(size=rows).astype(np.float32)
df_all_customers["avg_passenger_count"] = np.random.random(size=rows).astype(
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,6 @@ def get_historical_features(
"""
_feature_refs = self._get_features(features, feature_refs)

print(f"_feature_refs: {_feature_refs}")

all_feature_views = self._registry.list_feature_views(project=self.project)
feature_views = list(
view for view, _ in _group_feature_refs(_feature_refs, all_feature_views)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def evaluate_historical_retrieval():
# Ensure that we delete dataframes to free up memory
del df_to_join

# Move "datetime" column to front
# Move "event_timestamp" column to front
current_cols = entity_df_with_features.columns.tolist()
current_cols.remove(entity_df_event_timestamp_col)
entity_df_with_features = entity_df_with_features[
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
or view. Only used for feature columns, not entities or timestamp columns.
Examples:
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime")
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="event_timestamp")
"""
if path is None and file_url is None:
raise ValueError(
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ def to_proto(self) -> DataSourceProto:
def validate(self, config: RepoConfig):
# As long as the query gets successfully executed, or the table exists,
# the data source is validated. We don't need the results though.
# TODO: uncomment this
# self.get_table_column_names_and_types(config)
print("Validate", self.get_table_column_names_and_types(config))
self.get_table_column_names_and_types(config)

def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _validate_offline_store_config(cls, values):
elif values["provider"] == "gcp":
values["offline_store"]["type"] = "bigquery"
elif values["provider"] == "aws":
values["offline_store"]["type"] = "file"
values["offline_store"]["type"] = "redshift"

offline_store_type = values["offline_store"]["type"]

Expand Down
57 changes: 49 additions & 8 deletions sdk/python/feast/templates/aws/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import click

from feast.infra.utils import aws_utils


def bootstrap():
# Bootstrap() will automatically be called from the init_repo() during `feast init`

Expand All @@ -6,21 +11,57 @@ def bootstrap():

from feast.driver_test_data import create_driver_hourly_stats_df

repo_path = pathlib.Path(__file__).parent.absolute()
data_path = repo_path / "data"
data_path.mkdir(exist_ok=True)

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)

driver_stats_path = data_path / "driver_stats.parquet"
driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True)
aws_region = click.prompt("AWS Region (e.g. us-west-2)")
cluster_id = click.prompt("Redshift Cluster ID")
database = click.prompt("Redshift Database Name")
user = click.prompt("Redshift User Name")
s3_staging_location = click.prompt("Redshift S3 Staging Location (s3://*)")
iam_role = click.prompt("Redshift IAM Role for S3 (arn:aws:iam::*:role/*)")

if click.confirm(
"Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)?",
default=True,
):
client = aws_utils.get_redshift_data_client(aws_region)
s3 = aws_utils.get_s3_resource(aws_region)

aws_utils.execute_redshift_statement(
client,
cluster_id,
database,
user,
"DROP TABLE IF EXISTS feast_driver_hourly_stats",
)

aws_utils.upload_df_to_redshift(
client,
cluster_id,
database,
user,
s3,
f"{s3_staging_location}/data/feast_driver_hourly_stats.parquet",
iam_role,
"feast_driver_hourly_stats",
driver_df,
)

repo_path = pathlib.Path(__file__).parent.absolute()
config_file = repo_path / "feature_store.yaml"

example_py_file = repo_path / "example.py"
replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path))
replace_str_in_file(config_file, "%AWS_REGION%", aws_region)
replace_str_in_file(config_file, "%REDSHIFT_CLUSTER_ID%", cluster_id)
replace_str_in_file(config_file, "%REDSHIFT_DATABASE%", database)
replace_str_in_file(config_file, "%REDSHIFT_USER%", user)
replace_str_in_file(
config_file, "%REDSHIFT_S3_STAGING_LOCATION%", s3_staging_location
)
replace_str_in_file(config_file, "%REDSHIFT_IAM_ROLE%", iam_role)


def replace_str_in_file(file_path, match_str, sub_str):
Expand Down
64 changes: 64 additions & 0 deletions sdk/python/feast/templates/aws/driver_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from datetime import timedelta

from feast import Entity, Feature, FeatureView, RedshiftSource, ValueType

# Define an entity for the driver. Entities can be thought of as primary keys used to
# retrieve features. Entities are also used to join multiple tables/views during the
# construction of feature vectors
driver = Entity(
# Name of the entity. Must be unique within a project
name="driver_id",
# The join key of an entity describes the storage level field/column on which
# features can be looked up. The join key is also used to join feature
# tables/views when building feature vectors
join_key="driver_id",
# The storage level type for an entity
value_type=ValueType.INT64,
)

# Indicates a data source from which feature values can be retrieved. Sources are queried when building training
# datasets or materializing features into an online store.
driver_stats_source = RedshiftSource(
# The Redshift table where features can be found
table="feast_driver_hourly_stats",
# The event timestamp is used for point-in-time joins and for ensuring only
# features within the TTL are returned
event_timestamp_column="event_timestamp",
# The (optional) created timestamp is used to ensure there are no duplicate
# feature rows in the offline store or when building training datasets
created_timestamp_column="created",
)

# Feature views are a grouping based on how features are stored in either the
# online or offline store.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
# The list of entities specifies the keys required for joining or looking
# up features from this feature view. The reference provided in this field
# correspond to the name of a defined entity (or entities)
entities=["driver_id"],
# The timedelta is the maximum age that each feature value may have
# relative to its lookup time. For historical features (used in training),
# TTL is relative to each timestamp provided in the entity dataframe.
# TTL also allows for eviction of keys from online stores and limits the
# amount of historical scanning required for historical feature values
# during retrieval
ttl=timedelta(weeks=52),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
# Batch sources are used to find feature values. In the case of this feature
# view we will query a source table on Redshift for driver statistics
# features
batch_source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)
35 changes: 0 additions & 35 deletions sdk/python/feast/templates/aws/example.py

This file was deleted.

11 changes: 11 additions & 0 deletions sdk/python/feast/templates/aws/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
project: my_project
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: %AWS_REGION%
offline_store:
type: redshift
cluster_id: %REDSHIFT_CLUSTER_ID%
region: %AWS_REGION%
database: %REDSHIFT_DATABASE%
user: %REDSHIFT_USER%
s3_staging_location: %REDSHIFT_S3_STAGING_LOCATION%
iam_role: %REDSHIFT_IAM_ROLE%
40 changes: 34 additions & 6 deletions sdk/python/feast/templates/aws/test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from datetime import datetime, timedelta

import pandas as pd
from example import driver, driver_hourly_stats_view
from driver_repo import driver, driver_stats_fv

from feast import FeatureStore

Expand All @@ -15,21 +15,49 @@ def main():

# Deploy the feature store to AWS
print("Deploying feature store to AWS...")
fs.apply([driver, driver_hourly_stats_view])
fs.apply([driver, driver_stats_fv])

# Select features
feature_refs = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]
features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]

# Create an entity dataframe. This is the dataframe that will be enriched with historical features
entity_df = pd.DataFrame(
{
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=datetime.now() - timedelta(days=3),
end=datetime.now(),
periods=3,
)
],
"driver_id": [1001, 1002, 1003],
}
)

print("Retrieving training data...")

# Retrieve historical features by joining the entity dataframe to the Redshift table source
training_df = fs.get_historical_features(
features=features, entity_df=entity_df
).to_df()

print()
print(training_df)

print()
print("Loading features into the online store...")
fs.materialize_incremental(end_date=datetime.now())

print()
print("Retrieving online features...")

# Retrieve features from the online store (DynamoDB)
# Retrieve features from the online store (Firestore)
online_features = fs.get_online_features(
features=feature_refs, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
).to_dict()

print()
print(pd.DataFrame.from_dict(online_features))


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/templates/gcp/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
# datasets or materializing features into an online store.
driver_stats_source = BigQuerySource(
# The BigQuery table where features can be found
table_ref="feast-oss.demo_data.driver_hourly_stats",
table_ref="feast-oss.demo_data.driver_hourly_stats_2",
# The event timestamp is used for point-in-time joins and for ensuring only
# features within the TTL are returned
event_timestamp_column="datetime",
event_timestamp_column="event_timestamp",
# The (optional) created timestamp is used to ensure there are no duplicate
# feature rows in the offline store or when building training datasets
created_timestamp_column="created",
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/local/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# for more info.
driver_hourly_stats = FileSource(
path="%PARQUET_PATH%",
event_timestamp_column="datetime",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

Expand Down
Loading

0 comments on commit 95a245a

Please sign in to comment.