Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add offline retrival integration tests using the universal repo #1769

Merged
merged 18 commits into from
Aug 16, 2021
21 changes: 16 additions & 5 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test
from tests.integration.feature_repos.test_repo_configuration import (
Environment,
parametrize_e2e_test,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view


@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
run_offline_online_store_consistency_test(fs)
def test_e2e_consistency(test_environment: Environment):
fs, ds = test_environment.feature_store, test_environment.data_source
fv = driver_feature_view(ds)
entity = driver()
fs.apply([fv, entity])

run_offline_online_store_consistency_test(fs, fv)


def check_offline_and_online_features(
Expand Down Expand Up @@ -63,10 +73,11 @@ def check_offline_and_online_features(
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
def run_offline_online_store_consistency_test(
fs: FeatureStore, fv: FeatureView
) -> None:
now = datetime.utcnow()

fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True

Expand Down
198 changes: 180 additions & 18 deletions sdk/python/tests/integration/feature_repos/test_repo_configuration.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
import tempfile
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union

import pandas as pd
import pytest
from attr import dataclass

from feast import FeatureStore, RepoConfig, importer
from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data, importer
from feast.data_source import DataSource
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.entities import customer, driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
create_customer_daily_profile_feature_view,
create_driver_hourly_stats_feature_view,
)


@dataclass
@dataclass(frozen=True, repr=True)
class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
Expand All @@ -30,6 +34,7 @@ class TestRepoConfig:
offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"

full_feature_names: bool = True
infer_event_timestamp_col: bool = True


FULL_REPO_CONFIGS: List[TestRepoConfig] = [
Expand All @@ -52,8 +57,108 @@ class TestRepoConfig:
PROVIDERS: List[str] = []


@dataclass
class Environment:
name: str
test_repo_config: TestRepoConfig
feature_store: FeatureStore
data_source: DataSource
data_source_creator: DataSourceCreator

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=7)
before_start_date = end_date - timedelta(days=365)
after_end_date = end_date + timedelta(days=365)

customer_entities = list(range(1001, 1110))
customer_df = driver_test_data.create_customer_daily_profile_df(
customer_entities, start_date, end_date
)

driver_entities = list(range(5001, 5110))
driver_df = driver_test_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)

orders_df = driver_test_data.create_orders_df(
customers=customer_entities,
drivers=driver_entities,
start_date=before_start_date,
end_date=after_end_date,
order_count=1000,
)

def customer_fixtures(self) -> FeatureView:
achals marked this conversation as resolved.
Show resolved Hide resolved
customer_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "customer_profile"
)
ds = self.data_source_creator.create_data_sources(
customer_table_id,
self.customer_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
return create_customer_daily_profile_feature_view(ds)

def driver_stats_fixtures(self) -> FeatureView:
driver_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "driver_hourly"
)
ds = self.data_source_creator.create_data_sources(
driver_table_id,
self.driver_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
return create_driver_hourly_stats_feature_view(ds)

def order_fixtures(self) -> pd.DataFrame:
return self.orders_df

def orders_sql_fixtures(self) -> Optional[str]:
achals marked this conversation as resolved.
Show resolved Hide resolved
pass


def vary_full_feature_names(configs: List[TestRepoConfig]) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, full_feature_names=True)
false_c = replace(c, full_feature_names=False)
new_configs.extend([true_c, false_c])
achals marked this conversation as resolved.
Show resolved Hide resolved
return new_configs


def vary_infer_event_timestamp_col(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, infer_event_timestamp_col=True)
false_c = replace(c, infer_event_timestamp_col=False)
new_configs.extend([true_c, false_c])
return new_configs


def vary_providers_for_offline_stores(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
if "FileDataSourceCreator" in c.offline_store_creator:
new_configs.append(c)
elif "RedshiftDataSourceCreator" in c.offline_store_creator:
for p in ["local", "aws"]:
new_configs.append(replace(c, provider=p))
elif "BigQueryDataSourceCreator" in c.offline_store_creator:
for p in ["local", "gcp", "gcp_custom_offline_config"]:
new_configs.append(replace(c, provider=p))
return new_configs


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
def construct_test_environment(
test_repo_config: TestRepoConfig, create_and_apply: bool = False
) -> Environment:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.
Expand All @@ -74,8 +179,8 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:

offline_creator: DataSourceCreator = importer.get_class_from_type(
module_name, config_class_name, "DataSourceCreator"
)()
ds = offline_creator.create_data_source(project, df)
)(project)
ds = offline_creator.create_data_sources(project, df)
offline_store = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

Expand All @@ -89,21 +194,78 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver()
fs.apply([fv, entity])
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source=ds,
data_source_creator=offline_creator,
)

yield fs
fvs = []
entities = []
try:
if create_and_apply:
fvs = []
entities.extend([driver(), customer()])
fvs.extend(
[
environment.driver_stats_fixtures(),
environment.customer_fixtures(),
]
)
fs.apply(fvs + entities)

fs.teardown()
offline_creator.teardown(project)
yield environment
finally:
if create_and_apply:
offline_creator.teardown(project)
achals marked this conversation as resolved.
Show resolved Hide resolved
fs.teardown()


def parametrize_e2e_test(e2e_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.

The decorator also ensures that sample data needed for the test is available in the relevant offline store.

Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).

The decorator takes care of tearing down the feature store, as well as the sample data.
"""

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: str(v))
def inner_test(config):
with construct_test_environment(config) as environment:
e2e_test(environment)

return inner_test


def parametrize_offline_retrieval_test(offline_retrieval_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.

The decorator also ensures that sample data needed for the test is available in the relevant offline store.

Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).

The decorator takes care of tearing down the feature store, as well as the sample data.
"""

configs = vary_providers_for_offline_stores([TestRepoConfig()])
configs = vary_full_feature_names(configs)
configs = vary_infer_event_timestamp_col(configs)

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
@pytest.mark.parametrize("config", configs, ids=lambda v: str(v))
def inner_test(config):
with construct_feature_store(config) as fs:
e2e_test(fs)
with construct_test_environment(config, create_and_apply=True) as environment:
offline_retrieval_test(environment)

return inner_test
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
def create_data_sources(
self,
name: str,
destination: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
Expand All @@ -22,5 +22,9 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
...

@abstractmethod
def teardown(self, name: str):
def teardown(self):
...

@abstractmethod
def get_prefixed_table_name(self, name: str, suffix: str) -> str:
...
Loading