Skip to content

Commit

Permalink
Fix time zone issue with get_historical_features (#1475)
Browse files Browse the repository at this point in the history
* Fix time zone issue with get_historical_features

Signed-off-by: Tsotne Tabidze <[email protected]>

* Fix dtypes

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored Apr 19, 2021
1 parent dd73c6e commit 307f110
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 56 deletions.
31 changes: 29 additions & 2 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
# This module generates dummy data to be used for tests and examples.
from enum import Enum

import numpy as np
import pandas as pd
from pytz import FixedOffset, timezone, utc

from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL


class EventTimestampType(Enum):
TZ_NAIVE = 0
TZ_AWARE_UTC = 1
TZ_AWARE_FIXED_OFFSET = 2
TZ_AWARE_US_PACIFIC = 3


def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampType):
if t == EventTimestampType.TZ_NAIVE:
return event_timestamp
elif t == EventTimestampType.TZ_AWARE_UTC:
return event_timestamp.replace(tzinfo=utc)
elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET:
return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60))
elif t == EventTimestampType.TZ_AWARE_US_PACIFIC:
return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific"))


def create_orders_df(
customers, drivers, start_date, end_date, order_count
) -> pd.DataFrame:
Expand All @@ -23,9 +44,15 @@ def create_orders_df(
df["driver_id"] = np.random.choice(drivers, order_count)
df["customer_id"] = np.random.choice(customers, order_count)
df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32)

df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(start=start_date, end=end_date, periods=order_count)
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=order_count)
)
]
df.sort_values(
by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"],
Expand Down
16 changes: 13 additions & 3 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,20 @@ def evaluate_historical_retrieval():
entity_df[ENTITY_DF_EVENT_TIMESTAMP_COL] = entity_df[
ENTITY_DF_EVENT_TIMESTAMP_COL
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))
# Sort entity dataframe prior to join, and create a copy to prevent modifying the original
entity_df_with_features = entity_df.sort_values(

# Create a copy of entity_df to prevent modifying the original
entity_df_with_features = entity_df.copy()

# Convert event timestamp column to datetime and normalize time zone to UTC
# This is necessary to avoid issues with pd.merge_asof
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL] = pd.to_datetime(
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL], utc=True
)

# Sort event timestamp values
entity_df_with_features = entity_df_with_features.sort_values(
ENTITY_DF_EVENT_TIMESTAMP_COL
).copy()
)

# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
Expand Down
123 changes: 72 additions & 51 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import pytest
from google.cloud import bigquery
from pandas.testing import assert_frame_equal
from pytz import utc

import feast.driver_test_data as driver_data
from feast import utils
from feast.data_source import BigQuerySource, FileSource
from feast.entity import Entity
from feast.feature import Feature
Expand Down Expand Up @@ -98,74 +100,93 @@ def create_customer_daily_profile_feature_view(source):
return customer_profile_feature_view


# Converts the given column of the pandas records to UTC timestamps
def convert_timestamp_records_to_utc(records, column):
for record in records:
record[column] = utils.make_tzaware(record[column]).astimezone(utc)
return records


# Find the latest record in the given time range and filter
def find_asof_record(records, ts_key, ts_start, ts_end, filter_key, filter_value):
found_record = {}
for record in records:
if record[filter_key] == filter_value and ts_start <= record[ts_key] <= ts_end:
if not found_record or found_record[ts_key] < record[ts_key]:
found_record = record
return found_record


def get_expected_training_df(
customer_df: pd.DataFrame,
customer_fv: FeatureView,
driver_df: pd.DataFrame,
driver_fv: FeatureView,
orders_df: pd.DataFrame,
):
expected_orders_df = orders_df.copy().sort_values(ENTITY_DF_EVENT_TIMESTAMP_COL)
expected_drivers_df = driver_df.copy().sort_values(
driver_fv.input.event_timestamp_column
# Convert all pandas dataframes into records with UTC timestamps
order_records = convert_timestamp_records_to_utc(
orders_df.to_dict("records"), "event_timestamp"
)
expected_orders_with_drivers = pd.merge_asof(
expected_orders_df,
expected_drivers_df[
[
driver_fv.input.event_timestamp_column,
"driver_id",
"conv_rate",
"avg_daily_trips",
]
],
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
right_on=driver_fv.input.event_timestamp_column,
by=["driver_id"],
tolerance=driver_fv.ttl,
driver_records = convert_timestamp_records_to_utc(
driver_df.to_dict("records"), driver_fv.input.event_timestamp_column
)

expected_orders_with_drivers.drop(
columns=[driver_fv.input.event_timestamp_column], inplace=True
customer_records = convert_timestamp_records_to_utc(
customer_df.to_dict("records"), customer_fv.input.event_timestamp_column
)

expected_customers_df = customer_df.copy().sort_values(
[customer_fv.input.event_timestamp_column]
)
expected_df = pd.merge_asof(
expected_orders_with_drivers,
expected_customers_df[
[
customer_fv.input.event_timestamp_column,
"customer_id",
"current_balance",
"avg_passenger_count",
"lifetime_trip_count",
]
],
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
right_on=customer_fv.input.event_timestamp_column,
by=["customer_id"],
tolerance=customer_fv.ttl,
)
expected_df.drop(columns=[driver_fv.input.event_timestamp_column], inplace=True)
# Manually do point-in-time join of orders to drivers and customers records
for order_record in order_records:
driver_record = find_asof_record(
driver_records,
ts_key=driver_fv.input.event_timestamp_column,
ts_start=order_record["event_timestamp"] - driver_fv.ttl,
ts_end=order_record["event_timestamp"],
filter_key="driver_id",
filter_value=order_record["driver_id"],
)
customer_record = find_asof_record(
customer_records,
ts_key=customer_fv.input.event_timestamp_column,
ts_start=order_record["event_timestamp"] - customer_fv.ttl,
ts_end=order_record["event_timestamp"],
filter_key="customer_id",
filter_value=order_record["customer_id"],
)
order_record.update(
{
f"driver_stats__{k}": driver_record.get(k, None)
for k in ("conv_rate", "avg_daily_trips")
}
)
order_record.update(
{
f"customer_profile__{k}": customer_record.get(k, None)
for k in (
"current_balance",
"avg_passenger_count",
"lifetime_trip_count",
)
}
)

# Convert records back to pandas dataframe
expected_df = pd.DataFrame(order_records)

# Move "datetime" column to front
current_cols = expected_df.columns.tolist()
current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
expected_df = expected_df[[ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols]

# Rename columns to have double underscore
expected_df.rename(
inplace=True,
columns={
"conv_rate": "driver_stats__conv_rate",
"avg_daily_trips": "driver_stats__avg_daily_trips",
"current_balance": "customer_profile__current_balance",
"avg_passenger_count": "customer_profile__avg_passenger_count",
"lifetime_trip_count": "customer_profile__lifetime_trip_count",
},
)
# Cast some columns to expected types, since we lose information when converting pandas DFs into Python objects.
expected_df["order_is_success"] = expected_df["order_is_success"].astype("int32")
expected_df["customer_profile__current_balance"] = expected_df[
"customer_profile__current_balance"
].astype("float32")
expected_df["customer_profile__avg_passenger_count"] = expected_df[
"customer_profile__avg_passenger_count"
].astype("float32")

return expected_df


Expand Down

0 comments on commit 307f110

Please sign in to comment.