From c33aa5fa8769fe7f63c013b641011cbbb9cd2205 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Sun, 10 Nov 2019 14:55:47 +0800 Subject: [PATCH] Fix batch client call missing request, fix multi-entity lookup --- sdk/python/feast/client.py | 3 +- .../templates/bq_featureset_query.sql | 65 +++++++++++++------ 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 825e4f20a8..8816bd46f0 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -349,7 +349,8 @@ def get_batch_features( # Retrieve serving information to determine store type and staging location serving_info = ( - self._serving_service_stub.GetFeastServingInfo() + self._serving_service_stub.GetFeastServingInfo( + GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT) ) # type: GetFeastServingInfoResponse if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: diff --git a/serving/src/main/resources/templates/bq_featureset_query.sql b/serving/src/main/resources/templates/bq_featureset_query.sql index a44365e7fb..237fe3ba94 100644 --- a/serving/src/main/resources/templates/bq_featureset_query.sql +++ b/serving/src/main/resources/templates/bq_featureset_query.sql @@ -1,5 +1,4 @@ -SELECT {{ fullEntitiesList | join(', ')}}, {% for featureSet in featureSets %}{% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }}, {% endfor %}{% endfor %}event_timestamp FROM (WITH union_features AS ( -SELECT +WITH union_features AS (SELECT event_timestamp, {% for featureSet in featureSets %}NULL as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {% endfor %}{{ fullEntitiesList | join(', ')}}, @@ -24,38 +23,62 @@ SELECT {% endif %} {% endfor %} false AS is_entity_table -FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}' +FROM `{{projectId}}.{{datasetId}}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second) {% endfor %} -) -SELECT - event_timestamp, - {% for featureSet in featureSets %} - IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, - {% endfor %} - {{ fullEntitiesList | join(', ')}} - FROM ( +), ts_union AS ( +{% for featureSet in featureSets %} + SELECT * FROM ( + SELECT + event_timestamp, + {{ fullEntitiesList | join(', ')}}, + {% for otherFeatureSet in featureSets %} + {% if otherFeatureSet.id == featureSet.id %} + LAST_VALUE({{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp, + {% else %} + {{ otherFeatureSet.name }}_v{{ otherFeatureSet.version }}_feature_timestamp, + {% endif %} + {% endfor %} + is_entity_table + FROM union_features + WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp, is_entity_table ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + ) WHERE is_entity_table + {% if loop.last %} + {% else %} + UNION ALL + {% endif %} +{% endfor %} +), ts_coalesce AS ( SELECT event_timestamp, {{ fullEntitiesList | join(', ')}}, {% for featureSet in featureSets %} LAST_VALUE({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {% endfor %} - is_entity_table - FROM union_features - WINDOW w AS (PARTITION BY source ORDER BY event_timestamp ASC) -) WHERE is_entity_table) + ROW_NUMBER() over w as rn + FROM ts_union + WINDOW w AS (PARTITION BY {{ fullEntitiesList | join(', ')}}, event_timestamp ORDER BY event_timestamp) +), ts_final AS ( +SELECT + event_timestamp, + {{ fullEntitiesList | join(', ')}}, + {% for featureSet in featureSets %} + IF(event_timestamp >= {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, NULL) as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM ts_coalesce WHERE rn = 1 +) +SELECT * FROM ts_final {% for featureSet in featureSets %} -LEFT JOIN -(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, +LEFT JOIN +(SELECT {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {% for featureName in featureSet.features %}{{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }}, {% endfor %}{{ featureSet.entities | join(', ') }} - FROM (SELECT + FROM (SELECT event_timestamp as {{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.entities | join(', ') }}, {% for featureName in featureSet.features %} {{ featureName }} as {{ featureSet.name }}_v{{ featureSet.version }}_{{ featureName }}, {% endfor %}ROW_NUMBER() OVER(PARTITION BY event_timestamp, {{ featureSet.entities | join(', ') }} ORDER BY created_timestamp DESC) as {{ featureSet.name }}_v{{ featureSet.version }}_rown - FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= '{{minTimestamp}}' + FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second) ) WHERE {{ featureSet.name }}_v{{ featureSet.version }}_rown = 1 -) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, source) -{% endfor %} \ No newline at end of file +) USING ({{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, {{ featureSet.entities | join(', ') }}) +{% endfor %}