Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add offline retrival integration tests using the universal repo #1769

Merged
merged 18 commits into from
Aug 16, 2021
23 changes: 18 additions & 5 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test
from tests.integration.feature_repos.test_repo_configuration import (
Environment,
parametrize_e2e_test,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view


@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
run_offline_online_store_consistency_test(fs)
def test_e2e_consistency(test_environment: Environment):
fs, fv = (
test_environment.feature_store,
driver_feature_view(test_environment.data_source),
)
entity = driver()
fs.apply([fv, entity])

run_offline_online_store_consistency_test(fs, fv)


def check_offline_and_online_features(
Expand Down Expand Up @@ -63,10 +75,11 @@ def check_offline_and_online_features(
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
def run_offline_online_store_consistency_test(
fs: FeatureStore, fv: FeatureView
) -> None:
now = datetime.utcnow()

fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True

Expand Down
227 changes: 204 additions & 23 deletions sdk/python/tests/integration/feature_repos/test_repo_configuration.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import tempfile
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union

import pytest
from attr import dataclass

from feast import FeatureStore, RepoConfig, importer
from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data, importer
from feast.data_source import DataSource
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.entities import customer, driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
create_customer_daily_profile_feature_view,
create_driver_hourly_stats_feature_view,
)


@dataclass
@dataclass(frozen=True, repr=True)
class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
Expand All @@ -30,20 +33,21 @@ class TestRepoConfig:
offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"

full_feature_names: bool = True
infer_event_timestamp_col: bool = True


FULL_REPO_CONFIGS: List[TestRepoConfig] = [
TestRepoConfig(), # Local
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
TestRepoConfig(
provider="gcp",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator",
online_store="datastore",
),
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
]


Expand All @@ -52,8 +56,128 @@ class TestRepoConfig:
PROVIDERS: List[str] = []


@dataclass
class Environment:
name: str
test_repo_config: TestRepoConfig
feature_store: FeatureStore
data_source: DataSource
data_source_creator: DataSourceCreator

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=7)
before_start_date = end_date - timedelta(days=365)
after_end_date = end_date + timedelta(days=365)

customer_entities = list(range(1001, 1110))
customer_df = driver_test_data.create_customer_daily_profile_df(
customer_entities, start_date, end_date
)
_customer_feature_view: Optional[FeatureView] = None

driver_entities = list(range(5001, 5110))
driver_df = driver_test_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)
_driver_stats_feature_view: Optional[FeatureView] = None

orders_df = driver_test_data.create_orders_df(
customers=customer_entities,
drivers=driver_entities,
start_date=before_start_date,
end_date=after_end_date,
order_count=1000,
)
_orders_table: Optional[str] = None

def customer_feature_view(self) -> FeatureView:
if self._customer_feature_view is None:
customer_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "customer_profile"
)
ds = self.data_source_creator.create_data_sources(
customer_table_id,
self.customer_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
self._customer_feature_view = create_customer_daily_profile_feature_view(ds)
return self._customer_feature_view

def driver_stats_feature_view(self) -> FeatureView:
if self._driver_stats_feature_view is None:
driver_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "driver_hourly"
)
ds = self.data_source_creator.create_data_sources(
driver_table_id,
self.driver_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
self._driver_stats_feature_view = create_driver_hourly_stats_feature_view(
ds
)
return self._driver_stats_feature_view

def orders_table(self) -> Optional[str]:
if self._orders_table is None:
orders_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "orders"
)
ds = self.data_source_creator.create_data_sources(
orders_table_id,
self.orders_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
if hasattr(ds, "table_ref"):
self._orders_table = ds.table_ref
elif hasattr(ds, "table"):
self._orders_table = ds.table
return self._orders_table


def vary_full_feature_names(configs: List[TestRepoConfig]) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, full_feature_names=True)
false_c = replace(c, full_feature_names=False)
new_configs.extend([true_c, false_c])
achals marked this conversation as resolved.
Show resolved Hide resolved
return new_configs


def vary_infer_event_timestamp_col(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, infer_event_timestamp_col=True)
false_c = replace(c, infer_event_timestamp_col=False)
new_configs.extend([true_c, false_c])
return new_configs


def vary_providers_for_offline_stores(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
if "FileDataSourceCreator" in c.offline_store_creator:
new_configs.append(c)
elif "RedshiftDataSourceCreator" in c.offline_store_creator:
for p in ["local", "aws"]:
new_configs.append(replace(c, provider=p))
elif "BigQueryDataSourceCreator" in c.offline_store_creator:
for p in ["local", "gcp"]:
new_configs.append(replace(c, provider=p))
return new_configs


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
def construct_test_environment(
test_repo_config: TestRepoConfig, create_and_apply: bool = False
) -> Environment:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.
Expand All @@ -74,8 +198,10 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:

offline_creator: DataSourceCreator = importer.get_class_from_type(
module_name, config_class_name, "DataSourceCreator"
)()
ds = offline_creator.create_data_source(project, df)
)(project)
ds = offline_creator.create_data_sources(
project, df, field_mapping={"ts_1": "ts", "id": "driver_id"}
)
offline_store = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

Expand All @@ -89,21 +215,76 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver()
fs.apply([fv, entity])
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source=ds,
data_source_creator=offline_creator,
)

yield fs
fvs = []
entities = []
try:
if create_and_apply:
entities.extend([driver(), customer()])
fvs.extend(
[
environment.driver_stats_feature_view(),
environment.customer_feature_view(),
]
)
fs.apply(fvs + entities)

fs.teardown()
offline_creator.teardown(project)
yield environment
finally:
offline_creator.teardown()
fs.teardown()


def parametrize_e2e_test(e2e_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.

The decorator also ensures that sample data needed for the test is available in the relevant offline store.

Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).

The decorator takes care of tearing down the feature store, as well as the sample data.
"""

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: str(v))
def inner_test(config):
with construct_test_environment(config) as environment:
e2e_test(environment)

return inner_test


def parametrize_offline_retrieval_test(offline_retrieval_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.

The decorator also ensures that sample data needed for the test is available in the relevant offline store.

Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).

The decorator takes care of tearing down the feature store, as well as the sample data.
"""

configs = vary_providers_for_offline_stores(FULL_REPO_CONFIGS)
configs = vary_full_feature_names(configs)
configs = vary_infer_event_timestamp_col(configs)

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
@pytest.mark.parametrize("config", configs, ids=lambda v: str(v))
def inner_test(config):
with construct_feature_store(config) as fs:
e2e_test(fs)
with construct_test_environment(config, create_and_apply=True) as environment:
offline_retrieval_test(environment)

return inner_test
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict

import pandas as pd

Expand All @@ -8,12 +9,13 @@

class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
def create_data_sources(
self,
name: str,
destination: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
) -> DataSource:
...

Expand All @@ -22,5 +24,9 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
...

@abstractmethod
def teardown(self, name: str):
def teardown(self):
...

@abstractmethod
def get_prefixed_table_name(self, name: str, suffix: str) -> str:
...
Loading