From 307f110987b939804ca665f934115a4d010118e6 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Mon, 19 Apr 2021 13:25:50 -0700 Subject: [PATCH] Fix time zone issue with get_historical_features (#1475) * Fix time zone issue with get_historical_features Signed-off-by: Tsotne Tabidze * Fix dtypes Signed-off-by: Tsotne Tabidze --- sdk/python/feast/driver_test_data.py | 31 ++++- sdk/python/feast/infra/offline_stores/file.py | 16 ++- sdk/python/tests/test_historical_retrieval.py | 123 ++++++++++-------- 3 files changed, 114 insertions(+), 56 deletions(-) diff --git a/sdk/python/feast/driver_test_data.py b/sdk/python/feast/driver_test_data.py index 0ac9b3f9b7..0166a68242 100644 --- a/sdk/python/feast/driver_test_data.py +++ b/sdk/python/feast/driver_test_data.py @@ -1,10 +1,31 @@ # This module generates dummy data to be used for tests and examples. +from enum import Enum + import numpy as np import pandas as pd +from pytz import FixedOffset, timezone, utc from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL +class EventTimestampType(Enum): + TZ_NAIVE = 0 + TZ_AWARE_UTC = 1 + TZ_AWARE_FIXED_OFFSET = 2 + TZ_AWARE_US_PACIFIC = 3 + + +def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampType): + if t == EventTimestampType.TZ_NAIVE: + return event_timestamp + elif t == EventTimestampType.TZ_AWARE_UTC: + return event_timestamp.replace(tzinfo=utc) + elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET: + return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60)) + elif t == EventTimestampType.TZ_AWARE_US_PACIFIC: + return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific")) + + def create_orders_df( customers, drivers, start_date, end_date, order_count ) -> pd.DataFrame: @@ -23,9 +44,15 @@ def create_orders_df( df["driver_id"] = np.random.choice(drivers, order_count) df["customer_id"] = np.random.choice(customers, order_count) df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32) + df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [ - pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") - for dt in pd.date_range(start=start_date, end=end_date, periods=order_count) + _convert_event_timestamp( + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), + EventTimestampType(idx % 4), + ) + for idx, dt in enumerate( + pd.date_range(start=start_date, end=end_date, periods=order_count) + ) ] df.sort_values( by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"], diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 7893f2c947..463591da4d 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -60,10 +60,20 @@ def evaluate_historical_retrieval(): entity_df[ENTITY_DF_EVENT_TIMESTAMP_COL] = entity_df[ ENTITY_DF_EVENT_TIMESTAMP_COL ].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)) - # Sort entity dataframe prior to join, and create a copy to prevent modifying the original - entity_df_with_features = entity_df.sort_values( + + # Create a copy of entity_df to prevent modifying the original + entity_df_with_features = entity_df.copy() + + # Convert event timestamp column to datetime and normalize time zone to UTC + # This is necessary to avoid issues with pd.merge_asof + entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL] = pd.to_datetime( + entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL], utc=True + ) + + # Sort event timestamp values + entity_df_with_features = entity_df_with_features.sort_values( ENTITY_DF_EVENT_TIMESTAMP_COL - ).copy() + ) # Load feature view data from sources and join them incrementally for feature_view, features in feature_views_to_features.items(): diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index c67f59b0bc..2b0875e08b 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -10,8 +10,10 @@ import pytest from google.cloud import bigquery from pandas.testing import assert_frame_equal +from pytz import utc import feast.driver_test_data as driver_data +from feast import utils from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity from feast.feature import Feature @@ -98,6 +100,23 @@ def create_customer_daily_profile_feature_view(source): return customer_profile_feature_view +# Converts the given column of the pandas records to UTC timestamps +def convert_timestamp_records_to_utc(records, column): + for record in records: + record[column] = utils.make_tzaware(record[column]).astimezone(utc) + return records + + +# Find the latest record in the given time range and filter +def find_asof_record(records, ts_key, ts_start, ts_end, filter_key, filter_value): + found_record = {} + for record in records: + if record[filter_key] == filter_value and ts_start <= record[ts_key] <= ts_end: + if not found_record or found_record[ts_key] < record[ts_key]: + found_record = record + return found_record + + def get_expected_training_df( customer_df: pd.DataFrame, customer_fv: FeatureView, @@ -105,67 +124,69 @@ def get_expected_training_df( driver_fv: FeatureView, orders_df: pd.DataFrame, ): - expected_orders_df = orders_df.copy().sort_values(ENTITY_DF_EVENT_TIMESTAMP_COL) - expected_drivers_df = driver_df.copy().sort_values( - driver_fv.input.event_timestamp_column + # Convert all pandas dataframes into records with UTC timestamps + order_records = convert_timestamp_records_to_utc( + orders_df.to_dict("records"), "event_timestamp" ) - expected_orders_with_drivers = pd.merge_asof( - expected_orders_df, - expected_drivers_df[ - [ - driver_fv.input.event_timestamp_column, - "driver_id", - "conv_rate", - "avg_daily_trips", - ] - ], - left_on=ENTITY_DF_EVENT_TIMESTAMP_COL, - right_on=driver_fv.input.event_timestamp_column, - by=["driver_id"], - tolerance=driver_fv.ttl, + driver_records = convert_timestamp_records_to_utc( + driver_df.to_dict("records"), driver_fv.input.event_timestamp_column ) - - expected_orders_with_drivers.drop( - columns=[driver_fv.input.event_timestamp_column], inplace=True + customer_records = convert_timestamp_records_to_utc( + customer_df.to_dict("records"), customer_fv.input.event_timestamp_column ) - expected_customers_df = customer_df.copy().sort_values( - [customer_fv.input.event_timestamp_column] - ) - expected_df = pd.merge_asof( - expected_orders_with_drivers, - expected_customers_df[ - [ - customer_fv.input.event_timestamp_column, - "customer_id", - "current_balance", - "avg_passenger_count", - "lifetime_trip_count", - ] - ], - left_on=ENTITY_DF_EVENT_TIMESTAMP_COL, - right_on=customer_fv.input.event_timestamp_column, - by=["customer_id"], - tolerance=customer_fv.ttl, - ) - expected_df.drop(columns=[driver_fv.input.event_timestamp_column], inplace=True) + # Manually do point-in-time join of orders to drivers and customers records + for order_record in order_records: + driver_record = find_asof_record( + driver_records, + ts_key=driver_fv.input.event_timestamp_column, + ts_start=order_record["event_timestamp"] - driver_fv.ttl, + ts_end=order_record["event_timestamp"], + filter_key="driver_id", + filter_value=order_record["driver_id"], + ) + customer_record = find_asof_record( + customer_records, + ts_key=customer_fv.input.event_timestamp_column, + ts_start=order_record["event_timestamp"] - customer_fv.ttl, + ts_end=order_record["event_timestamp"], + filter_key="customer_id", + filter_value=order_record["customer_id"], + ) + order_record.update( + { + f"driver_stats__{k}": driver_record.get(k, None) + for k in ("conv_rate", "avg_daily_trips") + } + ) + order_record.update( + { + f"customer_profile__{k}": customer_record.get(k, None) + for k in ( + "current_balance", + "avg_passenger_count", + "lifetime_trip_count", + ) + } + ) + + # Convert records back to pandas dataframe + expected_df = pd.DataFrame(order_records) # Move "datetime" column to front current_cols = expected_df.columns.tolist() current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL) expected_df = expected_df[[ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols] - # Rename columns to have double underscore - expected_df.rename( - inplace=True, - columns={ - "conv_rate": "driver_stats__conv_rate", - "avg_daily_trips": "driver_stats__avg_daily_trips", - "current_balance": "customer_profile__current_balance", - "avg_passenger_count": "customer_profile__avg_passenger_count", - "lifetime_trip_count": "customer_profile__lifetime_trip_count", - }, - ) + # Cast some columns to expected types, since we lose information when converting pandas DFs into Python objects. + expected_df["order_is_success"] = expected_df["order_is_success"].astype("int32") + expected_df["customer_profile__current_balance"] = expected_df[ + "customer_profile__current_balance" + ].astype("float32") + expected_df["customer_profile__avg_passenger_count"] = expected_df[ + "customer_profile__avg_passenger_count" + ].astype("float32") + return expected_df