Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add interfaces for batch materialization engine #2901

Merged
merged 11 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdk/python/feast/infra/materialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationTask,
)
from .local_engine import LocalMaterializationEngine

__all__ = [
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
"MaterializationJob",
"MaterializationTask",
"BatchMaterializationEngine",
"LocalMaterializationEngine",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import enum
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Optional, Sequence, Union

from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView


@dataclass
class MaterializationTask:
"""
A MaterializationTask represents a unit of data that needs to be materialized from an
offline store to an online store.
"""

project: str
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
start_time: datetime
end_time: datetime
tqdm_builder: Callable[[int], tqdm]


class MaterializationJobStatus(enum.Enum):
WAITING = 1
RUNNING = 2
AVAILABLE = 3
ERROR = 4
CANCELLING = 5
CANCELLED = 6
SUCCEEDED = 7


class MaterializationJob(ABC):
"""
MaterializationJob represents an ongoing or executed process that's materialization data as per the
achals marked this conversation as resolved.
Show resolved Hide resolved
definition of a materialization task.
"""

task: MaterializationTask

@abstractmethod
def status(self) -> MaterializationJobStatus:
...

@abstractmethod
def error(self) -> Optional[BaseException]:
...

@abstractmethod
def should_be_retried(self) -> bool:
...

@abstractmethod
def job_id(self) -> str:
...

@abstractmethod
def url(self) -> Optional[str]:
...


class BatchMaterializationEngine(ABC):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
self.repo_config = repo_config
self.offline_store = offline_store
self.online_store = online_store

@abstractmethod
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
"""This method ensures that any necessary infrastructure or resources needed by the
engine are set up ahead of materialization."""

@abstractmethod
def materialize(
self, registry: BaseRegistry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
"""
Materialize data from the offline store to the online store for this feature repo.
Args:
registry: The feast registry containing the applied feature views.
tasks: A list of individual materialization tasks.
Returns:
A list of materialization jobs representing each task.
"""
...

@abstractmethod
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
"""This method ensures that any infrastructure or resources set up by ``update()``are torn down."""
185 changes: 185 additions & 0 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union

from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView

from ...registry import BaseRegistry
from ...utils import (
_convert_arrow_to_proto,
_get_column_names,
_run_pyarrow_field_mapping,
)
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)

DEFAULT_BATCH_SIZE = 10_000


class LocalMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for local in-process engine"""

type: Literal["local"] = "local"
""" Type selector"""


@dataclass
class LocalMaterializationJob(MaterializationJob):
def __init__(
self,
job_id: str,
status: MaterializationJobStatus,
error: Optional[BaseException] = None,
) -> None:
super().__init__()
self._job_id: str = job_id
achals marked this conversation as resolved.
Show resolved Hide resolved
self._status: MaterializationJobStatus = status
self._error: Optional[BaseException] = error

def status(self) -> MaterializationJobStatus:
return self._status

def error(self) -> Optional[BaseException]:
return self._error

def should_be_retried(self) -> bool:
return False

def job_id(self) -> str:
return self._job_id

def url(self) -> Optional[str]:
return None


class LocalMaterializationEngine(BatchMaterializationEngine):
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
# Nothing to set up.
achals marked this conversation as resolved.
Show resolved Hide resolved
pass

def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
# Nothing to tear down.
pass

def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)

def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]

def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
start_date: datetime,
end_date: datetime,
project: str,
tqdm_builder: Callable[[int], tqdm],
):
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
self.online_store.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
)
except BaseException as e:
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.ERROR, error=e
)
8 changes: 4 additions & 4 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
get_pyarrow_schema_from_batch_source,
)
from feast.infra.provider import (
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
from feast.utils import (
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)


class FileOfflineStoreConfig(FeastConfigBaseModel):
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.type_map import feast_value_type_to_pa
from feast.utils import to_naive_utc
from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"

Expand Down
Loading