Skip to content

Commit

Permalink
fix: Fix for materializing entityless feature views in Snowflake (fea…
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnLemmonMedely authored Feb 29, 2024
1 parent 42f37bb commit 1e64c77
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
12 changes: 6 additions & 6 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import feast
from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.feature_view import DUMMY_ENTITY_ID, FeatureView
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
Expand Down Expand Up @@ -274,7 +274,11 @@ def _materialize_one(

fv_latest_values_sql = offline_job.to_sql()

if feature_view.entity_columns:
if (
feature_view.entity_columns[0].name == DUMMY_ENTITY_ID
): # entityless Feature View's placeholder entity
entities_to_write = 1
else:
join_keys = [entity.name for entity in feature_view.entity_columns]
unique_entities = '"' + '", "'.join(join_keys) + '"'

Expand All @@ -287,10 +291,6 @@ def _materialize_one(

with GetSnowflakeConnection(self.repo_config.offline_store) as conn:
entities_to_write = conn.cursor().execute(query).fetchall()[0][0]
else:
entities_to_write = (
1 # entityless feature view has a placeholder entity
)

if feature_view.batch_source.field_mapping is not None:
fv_latest_mapped_values_sql = _run_snowflake_field_mapping(
Expand Down
62 changes: 62 additions & 0 deletions sdk/python/tests/integration/materialization/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,65 @@ def test_snowflake_materialization_consistency_internal_with_lists(
finally:
fs.teardown()
snowflake_environment.data_source_creator.teardown()


@pytest.mark.integration
def test_snowflake_materialization_entityless_fv():
snowflake_config = IntegrationTestRepoConfig(
online_store=SNOWFLAKE_ONLINE_CONFIG,
offline_store_creator=SnowflakeDataSourceCreator,
batch_engine=SNOWFLAKE_ENGINE_CONFIG,
)
snowflake_environment = construct_test_environment(snowflake_config, None)

df = create_basic_driver_dataset()
entityless_df = df.drop("driver_id", axis=1)
ds = snowflake_environment.data_source_creator.create_data_source(
entityless_df,
snowflake_environment.feature_store.project,
field_mapping={"ts_1": "ts"},
)

fs = snowflake_environment.feature_store

# We include the driver entity so we can provide an entity ID when fetching features
driver = Entity(
name="driver_id",
join_keys=["driver_id"],
)

overall_stats_fv = FeatureView(
name="overall_hourly_stats",
entities=[],
ttl=timedelta(weeks=52),
source=ds,
)

try:
fs.apply([overall_stats_fv, driver])

# materialization is run in two steps and
# we use timestamp from generated dataframe as a split point
split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1)

print(f"Split datetime: {split_dt}")

now = datetime.utcnow()

start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
end_date = split_dt
fs.materialize(
feature_views=[overall_stats_fv.name],
start_date=start_date,
end_date=end_date,
)

response_dict = fs.get_online_features(
[f"{overall_stats_fv.name}:value"],
[{"driver_id": 1}], # Included because we need an entity
).to_dict()
assert response_dict["value"] == [0.3]

finally:
fs.teardown()
snowflake_environment.data_source_creator.teardown()

0 comments on commit 1e64c77

Please sign in to comment.