Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch ingestion fix #299

Merged
merged 1 commit into from
Nov 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ def get_batch_features(

# Retrieve serving information to determine store type and staging location
serving_info = (
self._serving_service_stub.GetFeastServingInfo()
self._serving_service_stub.GetFeastServingInfo(
GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT)
) # type: GetFeastServingInfoResponse

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
Expand Down
65 changes: 44 additions & 21 deletions serving/src/main/resources/templates/bq_featureset_query.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
SELECT {{ fullEntitiesList | join(', ')}}, {% for featureSet in featureSets %}{% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }}, {% endfor %}{% endfor %}event_timestamp FROM (WITH union_features AS (
SELECT
WITH union_features AS (SELECT
event_timestamp,
{% for featureSet in featureSets %}NULL as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}{{ fullEntitiesList | join(', ')}},
Expand All @@ -24,38 +23,62 @@ SELECT
{% endif %}
{% endfor %}
false AS is_entity_table
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}'
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
{% endfor %}
)
SELECT
event_timestamp,
{% for featureSet in featureSets %}
IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}
{{ fullEntitiesList | join(', ')}}
FROM (
), ts_union AS (
{% for featureSet in featureSets %}
SELECT * FROM (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for otherFeatureSet in featureSets %}
{% if otherFeatureSet.id == featureSet.id %}
LAST_VALUE({{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp,
{% else %}
{{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp,
{% endif %}
{% endfor %}
is_entity_table
FROM union_features
WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp, is_entity_table ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
) WHERE is_entity_table
{% if loop.last %}
{% else %}
UNION ALL
{% endif %}
{% endfor %}
), ts_coalesce AS (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for featureSet in featureSets %}
LAST_VALUE({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% endfor %}
is_entity_table
FROM union_features
WINDOW w AS (PARTITION BY source ORDER BY event_timestamp ASC)
) WHERE is_entity_table)
ROW_NUMBER() over w as rn
FROM ts_union
WINDOW w AS (PARTITION BY {{ fullEntitiesList | join(', ')}}, event_timestamp ORDER BY event_timestamp)
), ts_final AS (
SELECT
event_timestamp,
{{ fullEntitiesList | join(', ')}},
{% for featureSet in featureSets %}
IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM ts_coalesce WHERE rn = 1
)
SELECT * FROM ts_final
{% for featureSet in featureSets %}
LEFT JOIN
(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
LEFT JOIN
(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }},
{% endfor %}{{ featureSet.entities | join(', ') }}
FROM (SELECT
FROM (SELECT
event_timestamp as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
{{ featureSet.entities | join(', ') }},
{% for featureName in featureSet.features %}
{{ featureName }} as {{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }},
{% endfor %}ROW_NUMBER() OVER(PARTITION BY event_timestamp, {{ featureSet.entities | join(', ') }} ORDER BY created_timestamp DESC) as {{ featureSet.name }}_v{{ featureSet.version }}_rown
FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}'
FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
) WHERE {{ featureSet.name }}_v{{ featureSet.version }}_rown = 1
) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, source)
{% endfor %}
) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.entities | join(', ') }})
{% endfor %}