Skip to content

Commit

Permalink
Allow users not to set max age for batch retrieval (#446)
Browse files Browse the repository at this point in the history
* Allow users not to set max age for batch retrieval

* Fix typo in test assertion
  • Loading branch information
Chen Zhiling authored and Shu Heng committed Feb 13, 2020
1 parent fc879d7 commit 84cbf22
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ SELECT
created_timestamp,
{{ featureSet.entities | join(', ')}},
false AS is_entity_table
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}'
{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %}
),
/*
2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
Expand All @@ -47,7 +48,7 @@ SELECT
event_timestamp,
{{ featureSet.entities | join(', ')}},
{% for featureName in featureSet.features %}
IF(event_timestamp >= {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}, NULL) as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
IF(event_timestamp >= {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp {% if featureSet.maxAge == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% endif %}, {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}, NULL) as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM (
SELECT
Expand All @@ -72,7 +73,8 @@ SELECT
{% for featureName in featureSet.features %}
{{ featureName }} as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}'
{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %}
) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}})
WHERE is_entity_table
)
Expand Down
33 changes: 33 additions & 0 deletions tests/e2e/bq-batch-retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ def test_apply_all_featuresets(client):
client.apply(fs1)
client.apply(fs2)

no_max_age_fs = FeatureSet(
"no_max_age",
features=[Feature("feature_value8", ValueType.INT64)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=0),
)
client.apply(no_max_age_fs)


def test_get_batch_features_with_file(client):
file_fs1 = client.get_feature_set(name="file_feature_set", version=1)
Expand Down Expand Up @@ -327,3 +335,28 @@ def test_multiple_featureset_joins(client):

assert output["entity_id"].to_list() == [int(i) for i in output["feature_value6"].to_list()]
assert output["other_entity_id"].to_list() == output["other_feature_value7"].to_list()


def test_no_max_age(client):
no_max_age_fs = client.get_feature_set(name="no_max_age", version=1)

time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
N_ROWS = 10
features_8_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"entity_id": [i for i in range(N_ROWS)],
"feature_value8": [i for i in range(N_ROWS)],
}
)
client.ingest(no_max_age_fs, features_8_df)

time.sleep(15)
feature_retrieval_job = client.get_batch_features(
entity_rows=features_8_df[["datetime", "entity_id"]], feature_refs=[f"{PROJECT_NAME}/feature_value8:1"]
)

output = feature_retrieval_job.to_dataframe()
print(output.head())

assert output["entity_id"].to_list() == output["feature_value8"].to_list()

0 comments on commit 84cbf22

Please sign in to comment.