Skip to content

Commit

Permalink
Fix BQ historical retrieval with rows that got backfilled (#1744)
Browse files Browse the repository at this point in the history
* Fix how latest feature is calculated using Window function

Signed-off-by: Matt Delacour <matt.delacour@shopify.com>

* Add simple test to replicate the backfill issue

Signed-off-by: Matt Delacour <matt.delacour@shopify.com>
MattDelac authored Aug 6, 2021
1 parent f663865 commit 54bbe5f
Showing 2 changed files with 147 additions and 12 deletions.
25 changes: 13 additions & 12 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -430,20 +430,21 @@ def _get_bigquery_client(project: Optional[str] = None):
Thus we only need to compute the latest timestamp of each feature.
*/
{{ featureview.name }}__latest AS (
SELECT
{{featureview.name}}__entity_row_unique_id,
MAX(event_timestamp) AS event_timestamp
SELECT * EXCEPT(row_number)
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY {{featureview.name}}__entity_row_unique_id
ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}
) AS row_number,
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
,ANY_VALUE(created_timestamp) AS created_timestamp
INNER JOIN {{ featureview.name }}__dedup
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}
FROM {{ featureview.name }}__base
{% if featureview.created_timestamp_column %}
INNER JOIN {{ featureview.name }}__dedup
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
{% endif %}
GROUP BY {{featureview.name}}__entity_row_unique_id
)
WHERE row_number = 1
),
/*
Original file line number Diff line number Diff line change
@@ -992,3 +992,137 @@ def test_feature_name_collision_on_historical_retrieval():
"have different names."
)
assert str(error.value) == expected_error_message


@pytest.mark.integration
def test_historical_features_from_bigquery_sources_containing_backfills(capsys):
now = datetime.now().replace(microsecond=0, second=0, minute=0)
tomorrow = now + timedelta(days=1)

entity_dataframe = pd.DataFrame(
data=[
{"driver_id": 1001, "event_timestamp": now + timedelta(days=2)},
{"driver_id": 1002, "event_timestamp": now + timedelta(days=2)},
]
)

driver_stats_df = pd.DataFrame(
data=[
# Duplicated rows simple case
{
"driver_id": 1001,
"avg_daily_trips": 10,
"event_timestamp": now,
"created": tomorrow,
},
{
"driver_id": 1001,
"avg_daily_trips": 20,
"event_timestamp": tomorrow,
"created": tomorrow,
},
# Duplicated rows after a backfill
{
"driver_id": 1002,
"avg_daily_trips": 30,
"event_timestamp": now,
"created": tomorrow,
},
{
"driver_id": 1002,
"avg_daily_trips": 40,
"event_timestamp": tomorrow,
"created": now,
},
]
)

expected_df = pd.DataFrame(
data=[
{
"driver_id": 1001,
"event_timestamp": now + timedelta(days=2),
"avg_daily_trips": 20,
},
{
"driver_id": 1002,
"event_timestamp": now + timedelta(days=2),
"avg_daily_trips": 40,
},
]
)

bigquery_dataset = (
f"test_hist_retrieval_{int(time.time_ns())}_{random.randint(1000, 9999)}"
)

with BigQueryDataSet(bigquery_dataset), TemporaryDirectory() as temp_dir:
gcp_project = bigquery.Client().project

# Entity Dataframe SQL query
table_id = f"{bigquery_dataset}.orders"
stage_orders_bigquery(entity_dataframe, table_id)
entity_df_query = f"SELECT * FROM {gcp_project}.{table_id}"

# Driver Feature View
driver_table_id = f"{gcp_project}.{bigquery_dataset}.driver_hourly"
stage_driver_hourly_stats_bigquery_source(driver_stats_df, driver_table_id)

store = FeatureStore(
config=RepoConfig(
registry=os.path.join(temp_dir, "registry.db"),
project="".join(
random.choices(string.ascii_uppercase + string.digits, k=10)
),
provider="gcp",
offline_store=BigQueryOfflineStoreConfig(
type="bigquery", dataset=bigquery_dataset
),
)
)

driver = Entity(name="driver", join_key="driver_id", value_type=ValueType.INT64)
driver_fv = FeatureView(
name="driver_stats",
entities=["driver"],
features=[Feature(name="avg_daily_trips", dtype=ValueType.INT32)],
batch_source=BigQuerySource(
table_ref=driver_table_id,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
),
ttl=None,
)

store.apply([driver, driver_fv])

try:
job_from_sql = store.get_historical_features(
entity_df=entity_df_query,
features=["driver_stats:avg_daily_trips"],
full_feature_names=False,
)

start_time = datetime.utcnow()
actual_df_from_sql_entities = job_from_sql.to_df()
end_time = datetime.utcnow()
with capsys.disabled():
print(
str(
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
)
)

assert sorted(expected_df.columns) == sorted(
actual_df_from_sql_entities.columns
)
assert_frame_equal(
expected_df.sort_values(by=["driver_id"]).reset_index(drop=True),
actual_df_from_sql_entities[expected_df.columns]
.sort_values(by=["driver_id"])
.reset_index(drop=True),
check_dtype=False,
)

finally:
store.teardown()

0 comments on commit 54bbe5f

Please sign in to comment.