Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Historical field mappings #2251

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
2a7935b
historical_field_mappings fix CONTRIBUTING docs for creating venv
michelle-rascati-sp Jan 26, 2022
8b95281
historical_field_mappings fix mypy-protobuf conflict
michelle-rascati-sp Jan 26, 2022
6a8d126
historical_field_mappings adding tests for get_historical_features to…
michelle-rascati-sp Jan 27, 2022
99bf1ed
historical_field_mappings bigquery tests passing
michelle-rascati-sp Jan 27, 2022
a895594
historical_field_mappings redshift tests pass
michelle-rascati-sp Jan 27, 2022
12753f3
historical_field_mappings formatting
michelle-rascati-sp Jan 27, 2022
d15a210
historical_field_mappings make required so no .get() from None
michelle-rascati-sp Jan 27, 2022
ed011a3
historical_field_mappings type the registry so linter is happy
michelle-rascati-sp Jan 27, 2022
af8fc85
historical_field_mappings making pyling happy
michelle-rascati-sp Jan 27, 2022
e804fe9
historical_field_mappings formatting
michelle-rascati-sp Jan 27, 2022
b28aa79
historical_field_mappings Merge branch 'master' into historical_field…
michelle-rascati-sp Jan 27, 2022
0c1e79f
historical_field_mappings Revert "historical_field_mappings making py…
michelle-rascati-sp Jan 27, 2022
60e3692
historical_field_mappings redo FieldStatusValue
michelle-rascati-sp Jan 27, 2022
33dd5b9
historical_field_mappings already fixed upstream
michelle-rascati-sp Jan 27, 2022
c3c6747
historical_field_mappings formatting
michelle-rascati-sp Jan 27, 2022
2d43b74
historical_field_mappings remove unused import
michelle-rascati-sp Jan 27, 2022
cbcbed7
historical_field_mappings Revert "historical_field_mappings redo Fiel…
michelle-rascati-sp Jan 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI:
3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed
```sh
# create & activate a virtual environment
python -v venv venv/
python -m venv venv/
source venv/bin/activate
```

Expand Down
26 changes: 26 additions & 0 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame:
# TODO: Remove created timestamp in order to test whether its really optional
df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df_daily


def create_field_mapping_df(start_date, end_date) -> pd.DataFrame:
"""
Example df generated by this function:
| event_timestamp | column_name | created |
|------------------+-------------+------------------|
| 2021-03-17 19:00 | 99 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 22 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 7 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 45 | 2021-03-24 19:38 |
"""
size = 10
df = pd.DataFrame()
df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32)
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=size)
)
]
df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -699,7 +699,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class FeatureViewQueryContext:
ttl: int
entities: List[str]
features: List[str] # feature reference format
field_mapping: Dict[str, str]
event_timestamp_column: str
created_timestamp_column: Optional[str]
table_subquery: str
Expand Down Expand Up @@ -144,7 +145,10 @@ def get_feature_view_query_context(
name=feature_view.projection.name_to_use(),
ttl=ttl_seconds,
entities=join_keys,
features=features,
features=[
reverse_field_mapping.get(feature, feature) for feature in features
],
field_mapping=feature_view.input.field_mapping,
event_timestamp_column=reverse_field_mapping.get(
event_timestamp_column, event_timestamp_column
),
Expand Down Expand Up @@ -175,7 +179,11 @@ def build_point_in_time_query(
final_output_feature_names = list(entity_df_columns)
final_output_feature_names.extend(
[
(f"{fv.name}__{feature}" if full_feature_names else feature)
(
f"{fv.name}__{fv.field_mapping.get(feature, feature)}"
if full_feature_names
else fv.field_mapping.get(feature, feature)
)
for fv in feature_view_query_contexts
for feature in fv.features
]
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def _get_entity_df_event_timestamp_range(
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -664,7 +664,7 @@ def _get_entity_df_event_timestamp_range(
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
+ AWS_REQUIRED
)

DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED
DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED

# Get git repo root directory
repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
create_customer_daily_profile_feature_view,
create_driver_age_request_feature_view,
create_driver_hourly_stats_feature_view,
create_field_mapping_feature_view,
create_global_stats_feature_view,
create_location_stats_feature_view,
create_order_feature_view,
Expand Down Expand Up @@ -126,6 +127,7 @@ def construct_universal_datasets(
order_count=20,
)
global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time)
field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time)
entity_df = orders_df[
[
"customer_id",
Expand All @@ -143,6 +145,7 @@ def construct_universal_datasets(
"location": location_df,
"orders": orders_df,
"global": global_df,
"field_mapping": field_mapping_df,
"entity": entity_df,
}

Expand Down Expand Up @@ -180,12 +183,20 @@ def construct_universal_data_sources(
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
field_mapping_ds = data_source_creator.create_data_source(
datasets["field_mapping"],
destination_name="field_mapping",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
field_mapping={"column_name": "feature_name"},
)
return {
"customer": customer_ds,
"driver": driver_ds,
"location": location_ds,
"orders": orders_ds,
"global": global_ds,
"field_mapping": field_mapping_ds,
}


Expand All @@ -210,6 +221,9 @@ def construct_universal_feature_views(
"driver_age_request_fv": create_driver_age_request_feature_view(),
"order": create_order_feature_view(data_sources["orders"]),
"location": create_location_stats_feature_view(data_sources["location"]),
"field_mapping": create_field_mapping_feature_view(
data_sources["field_mapping"]
),
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,13 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
ttl=timedelta(days=2),
)
return location_stats_feature_view


def create_field_mapping_feature_view(source):
return FeatureView(
name="field_mapping",
entities=[],
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
batch_source=source,
ttl=timedelta(days=2),
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def get_expected_training_df(
location_fv: FeatureView,
global_df: pd.DataFrame,
global_fv: FeatureView,
field_mapping_df: pd.DataFrame,
field_mapping_fv: FeatureView,
entity_df: pd.DataFrame,
event_timestamp: str,
full_feature_names: bool = False,
Expand All @@ -102,6 +104,10 @@ def get_expected_training_df(
global_records = convert_timestamp_records_to_utc(
global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column
)
field_mapping_records = convert_timestamp_records_to_utc(
field_mapping_df.to_dict("records"),
field_mapping_fv.batch_source.event_timestamp_column,
)
entity_rows = convert_timestamp_records_to_utc(
entity_df.to_dict("records"), event_timestamp
)
Expand Down Expand Up @@ -156,6 +162,13 @@ def get_expected_training_df(
ts_end=order_record[event_timestamp],
)

field_mapping_record = find_asof_record(
field_mapping_records,
ts_key=field_mapping_fv.batch_source.event_timestamp_column,
ts_start=order_record[event_timestamp] - field_mapping_fv.ttl,
ts_end=order_record[event_timestamp],
)

entity_row.update(
{
(
Expand Down Expand Up @@ -197,6 +210,16 @@ def get_expected_training_df(
}
)

# get field_mapping_record by column name, but label by feature name
entity_row.update(
{
(
f"field_mapping__{feature}" if full_feature_names else feature
): field_mapping_record.get(column, None)
for (column, feature) in field_mapping_fv.input.field_mapping.items()
}
)

# Convert records back to pandas dataframe
expected_df = pd.DataFrame(entity_rows)

Expand All @@ -213,6 +236,7 @@ def get_expected_training_df(
"customer_profile__current_balance": "float32",
"customer_profile__avg_passenger_count": "float32",
"global_stats__avg_ride_length": "float32",
"field_mapping__feature_name": "int32",
}
else:
expected_column_types = {
Expand All @@ -221,6 +245,7 @@ def get_expected_training_df(
"current_balance": "float32",
"avg_passenger_count": "float32",
"avg_ride_length": "float32",
"feature_name": "int32",
}

for col, typ in expected_column_types.items():
Expand Down Expand Up @@ -311,6 +336,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df_with_request_data,
event_timestamp,
full_feature_names,
Expand All @@ -336,6 +363,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
"global_stats:num_rides",
"global_stats:avg_ride_length",
"driver_age:driver_age",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -404,6 +432,7 @@ def test_historical_features_with_missing_request_data(
"conv_rate_plus_100:conv_rate_plus_val_to_add",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -419,6 +448,7 @@ def test_historical_features_with_missing_request_data(
"driver_age:driver_age",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -452,6 +482,7 @@ def test_historical_features_with_entities_from_query(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -477,6 +508,8 @@ def test_historical_features_with_entities_from_query(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
datasets["entity"],
event_timestamp,
full_feature_names,
Expand Down Expand Up @@ -538,6 +571,7 @@ def test_historical_features_persisting(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -561,6 +595,8 @@ def test_historical_features_persisting(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df,
event_timestamp,
full_feature_names,
Expand Down