Skip to content

Commit

Permalink
Infer min and max timestamps from entity_df to limit data read from B…
Browse files Browse the repository at this point in the history
…Q source (#1665)

* filter data read by timestamp inferred from entity_df when querying BQ source

Signed-off-by: Mwad22 <[email protected]>

* timestamps inferred in SQL query rather than from pandas df when querying BQ source

Signed-off-by: Mwad22 <[email protected]>

* timestamps inferred in separate SQL query to avoid recomputin, and keep retrieval query simple

Signed-off-by: Mwad22 <[email protected]>

* Added an integration test for _get_entity_df_timestamp_bounds

Signed-off-by: Mwad22 <[email protected]>

* Linted and reformatted after merge conflict

Signed-off-by: Mwad22 <[email protected]>

* removed redundant function upload_entity_df_into_bigquery from historical retrieval test

Signed-off-by: Mwad22 <[email protected]>

* Explicitly set retrieved timestamps to same timezone when testing min/max timestamp inference

Signed-off-by: Mwad22 <[email protected]>

* typo fix, comparing object references instead of values

Signed-off-by: Mwad22 <[email protected]>

* fixed python linter issues, should be good to go

Signed-off-by: Mwad22 <[email protected]>
  • Loading branch information
Mwad22 authored Jul 8, 2021
1 parent 2e0113e commit b3c0cce
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
41 changes: 36 additions & 5 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas
import pyarrow
from jinja2 import BaseLoader, Environment
from pandas import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal
from tenacity import retry, stop_after_delay, wait_fixed
Expand Down Expand Up @@ -129,12 +130,16 @@ def get_historical_features(
full_feature_names=full_feature_names,
)

# TODO: Infer min_timestamp and max_timestamp from entity_df
# Infer min and max timestamps from entity_df to limit data read in BigQuery SQL query
min_timestamp, max_timestamp = _get_entity_df_timestamp_bounds(
client, str(table.reference), entity_df_event_timestamp_col
)

# Generate the BigQuery SQL query from the query context
query = build_point_in_time_query(
query_context,
min_timestamp=datetime.now() - timedelta(days=365),
max_timestamp=datetime.now() + timedelta(days=1),
min_timestamp=min_timestamp,
max_timestamp=max_timestamp,
left_table_query_string=str(table.reference),
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
full_feature_names=full_feature_names,
Expand Down Expand Up @@ -374,6 +379,28 @@ def _upload_entity_df_into_bigquery(
return table


def _get_entity_df_timestamp_bounds(
client: Client, entity_df_bq_table: str, event_timestamp_col: str,
):

boundary_df = (
client.query(
f"""
SELECT
MIN({event_timestamp_col}) AS min_timestamp,
MAX({event_timestamp_col}) AS max_timestamp
FROM {entity_df_bq_table}
"""
)
.result()
.to_dataframe()
)

min_timestamp = boundary_df.loc[0, "min_timestamp"]
max_timestamp = boundary_df.loc[0, "max_timestamp"]
return min_timestamp, max_timestamp


def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
Expand Down Expand Up @@ -435,8 +462,8 @@ def get_feature_view_query_context(

def build_point_in_time_query(
feature_view_query_contexts: List[FeatureViewQueryContext],
min_timestamp: datetime,
max_timestamp: datetime,
min_timestamp: Timestamp,
max_timestamp: Timestamp,
left_table_query_string: str,
entity_df_event_timestamp_col: str,
full_feature_names: bool = False,
Expand Down Expand Up @@ -533,6 +560,10 @@ def _get_bigquery_client(project: Optional[str] = None):
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{max_timestamp}}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= Timestamp_sub('{{min_timestamp}}', interval {{ featureview.ttl }} second)
{% endif %}
),
{{ featureview.name }}__base AS (
Expand Down
30 changes: 29 additions & 1 deletion sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from feast.feature import Feature
from feast.feature_store import FeatureStore, _validate_feature_refs
from feast.feature_view import FeatureView
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from feast.infra.offline_stores.bigquery import (
BigQueryOfflineStoreConfig,
_get_entity_df_timestamp_bounds,
)
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.infra.provider import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
from feast.value_type import ValueType
Expand Down Expand Up @@ -595,6 +598,31 @@ def test_historical_features_from_bigquery_sources(
)


@pytest.mark.integration
def test_timestamp_bound_inference_from_entity_df_using_bigquery():
start_date = datetime.now().replace(microsecond=0, second=0, minute=0)
(_, _, _, entity_df, start_date) = generate_entities(
start_date, infer_event_timestamp_col=True
)

table_id = "foo.table_id"
stage_orders_bigquery(entity_df, table_id)

client = bigquery.Client()
table = client.get_table(table=table_id)

# Ensure that the table expires after some time
table.expires = datetime.utcnow() + timedelta(minutes=30)
client.update_table(table, ["expires"])

min_timestamp, max_timestamp = _get_entity_df_timestamp_bounds(
client, str(table.reference), "e_ts"
)

assert min_timestamp.astimezone("UTC") == min(entity_df["e_ts"]).astimezone("UTC")
assert max_timestamp.astimezone("UTC") == max(entity_df["e_ts"]).astimezone("UTC")


def test_feature_name_collision_on_historical_retrieval():

# _validate_feature_refs is the function that checks for colliding feature names
Expand Down

0 comments on commit b3c0cce

Please sign in to comment.