Skip to content

Commit

Permalink
Add a feast plan command, and have CLI output differentiates between …
Browse files Browse the repository at this point in the history
…created, deleted and unchanged objects (#2147)

* Print changes in the repo objects in the new style during feast apply

Signed-off-by: Achal Shah <[email protected]>

* change color for deleted infra

Signed-off-by: Achal Shah <[email protected]>

* Add a feast plan command

Signed-off-by: Achal Shah <[email protected]>

* Add a feast plan command

Signed-off-by: Achal Shah <[email protected]>

* return from apply()

Signed-off-by: Achal Shah <[email protected]>

* Fix errors in doctests

Signed-off-by: Achal Shah <[email protected]>

* Fix deepcopy and use a clone method instead

Signed-off-by: Achal Shah <[email protected]>

* Fix registry clone

Signed-off-by: Achal Shah <[email protected]>

* CR updates

Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals authored Dec 16, 2021
1 parent 7eba23c commit ce243a4
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 89 deletions.
21 changes: 21 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
cli_check_repo,
generate_project_name,
init_repo,
plan,
registry_dump,
teardown,
)
Expand Down Expand Up @@ -351,6 +352,26 @@ def on_demand_feature_view_list(ctx: click.Context):
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command("plan", cls=NoOptionDefaultFormat)
@click.option(
"--skip-source-validation",
is_flag=True,
help="Don't validate the data sources by checking for that the tables exist.",
)
@click.pass_context
def plan_command(ctx: click.Context, skip_source_validation: bool):
"""
Create or update a feature store deployment
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
try:
plan(repo_config, repo, skip_source_validation)
except FeastProviderLoginError as e:
print(str(e))


@cli.command("apply", cls=NoOptionDefaultFormat)
@click.option(
"--skip-source-validation",
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TransitionType(Enum):

@dataclass
class FcoDiff:
name: str
fco_type: str
current_fco: Any
new_fco: Any
fco_property_diffs: List[PropertyDiff]
Expand Down
119 changes: 114 additions & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@
from collections import Counter, OrderedDict, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
from typing import (
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Set,
Tuple,
Union,
cast,
)

import pandas as pd
from colorama import Fore, Style
from tqdm import tqdm

from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand All @@ -51,6 +63,7 @@
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequestV2,
GetOnlineFeaturesResponse,
Expand All @@ -66,6 +79,31 @@
warnings.simplefilter("once", DeprecationWarning)


class RepoContents(NamedTuple):
feature_views: Set[FeatureView]
on_demand_feature_views: Set[OnDemandFeatureView]
request_feature_views: Set[RequestFeatureView]
entities: Set[Entity]
feature_services: Set[FeatureService]

def to_registry_proto(self) -> RegistryProto:
registry_proto = RegistryProto()
registry_proto.entities.extend([e.to_proto() for e in self.entities])
registry_proto.feature_views.extend(
[fv.to_proto() for fv in self.feature_views]
)
registry_proto.on_demand_feature_views.extend(
[fv.to_proto() for fv in self.on_demand_feature_views]
)
registry_proto.request_feature_views.extend(
[fv.to_proto() for fv in self.request_feature_views]
)
registry_proto.feature_services.extend(
[fs.to_proto() for fs in self.feature_services]
)
return registry_proto


class FeatureStore:
"""
A FeatureStore object is used to define, create, and retrieve features.
Expand Down Expand Up @@ -357,6 +395,55 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str
_feature_refs = _features
return _feature_refs

@log_exceptions_and_usage
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
"""Dry-run registering objects to metadata store.
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan
command are for informational purpose, and are not actually applied to the registry.
Args:
objects: A single object, or a list of objects that are intended to be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry.
partial: If True, apply will only handle the specified objects; if False, apply will also delete
all the objects in objects_to_delete.
Raises:
ValueError: The 'objects' parameter could not be parsed properly.
Examples:
Generate a plan adding an Entity and a FeatureView.
>>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
>>> from feast.feature_store import RepoContents
>>> from datetime import timedelta
>>> fs = FeatureStore(repo_path="feature_repo")
>>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
>>> driver_hourly_stats = FileSource(
... path="feature_repo/data/driver_stats.parquet",
... event_timestamp_column="event_timestamp",
... created_timestamp_column="created",
... )
>>> driver_hourly_stats_view = FeatureView(
... name="driver_hourly_stats",
... entities=["driver_id"],
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""

current_registry_proto = (
self._registry.cached_registry_proto.__deepcopy__()
if self._registry.cached_registry_proto
else RegistryProto()
)

desired_registry_proto = desired_repo_objects.to_registry_proto()
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
return diffs

@log_exceptions_and_usage
def apply(
self,
Expand Down Expand Up @@ -388,7 +475,7 @@ def apply(
]
] = None,
partial: bool = True,
):
) -> RegistryDiff:
"""Register objects to metadata store and update related infrastructure.
The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
Expand Down Expand Up @@ -424,18 +511,22 @@ def apply(
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
>>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
"""
# TODO: Add locking

if not isinstance(objects, Iterable):
objects = [objects]

assert isinstance(objects, list)

if not objects_to_delete:
objects_to_delete = []

current_registry_proto = (
self._registry.cached_registry_proto.__deepcopy__()
if self._registry.cached_registry_proto
else RegistryProto()
)

# Separate all objects into entities, feature services, and different feature view types.
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
Expand Down Expand Up @@ -533,6 +624,22 @@ def apply(
service.name, project=self.project, commit=False
)

new_registry_proto = (
self._registry.cached_registry_proto
if self._registry.cached_registry_proto
else RegistryProto()
)

diffs = Registry.diff_between(current_registry_proto, new_registry_proto)

entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]

entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)]
views_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, FeatureView)
]

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
Expand All @@ -544,6 +651,8 @@ def apply(

self._registry.commit()

return diffs

@log_exceptions_and_usage
def teardown(self):
"""Tears down all local and cloud resources for the feature store."""
Expand Down
88 changes: 71 additions & 17 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import NoopRegistryStore
from feast.repo_config import RegistryConfig
from feast.request_feature_view import RequestFeatureView

Expand Down Expand Up @@ -128,32 +129,85 @@ def __init__(
else 0
)

def clone(self) -> "Registry":
new_registry = Registry(None, None)
new_registry.cached_registry_proto_ttl = timedelta(seconds=0)
new_registry.cached_registry_proto = (
self.cached_registry_proto.__deepcopy__()
if self.cached_registry_proto
else RegistryProto()
)
new_registry.cached_registry_proto_created = datetime.utcnow()
new_registry._registry_store = NoopRegistryStore()
return new_registry

# TODO(achals): This method needs to be filled out and used in the feast plan/apply methods.
@staticmethod
def diff_between(
current_registry: RegistryProto, new_registry: RegistryProto
) -> RegistryDiff:
diff = RegistryDiff()

# Handle Entities
(
entities_to_keep,
entities_to_delete,
entities_to_add,
) = tag_proto_objects_for_keep_delete_add(
current_registry.entities, new_registry.entities,
)
attribute_to_object_type_str = {
"entities": "entity",
"feature_views": "feature view",
"feature_tables": "feature table",
"on_demand_feature_views": "on demand feature view",
"request_feature_views": "request feature view",
"feature_services": "feature service",
}

for e in entities_to_add:
diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE))
for e in entities_to_delete:
diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE))
for object_type in [
"entities",
"feature_views",
"feature_tables",
"on_demand_feature_views",
"request_feature_views",
"feature_services",
]:
(
objects_to_keep,
objects_to_delete,
objects_to_add,
) = tag_proto_objects_for_keep_delete_add(
getattr(current_registry, object_type),
getattr(new_registry, object_type),
)

for e in objects_to_add:
diff.add_fco_diff(
FcoDiff(
e.spec.name,
attribute_to_object_type_str[object_type],
None,
e,
[],
TransitionType.CREATE,
)
)
for e in objects_to_delete:
diff.add_fco_diff(
FcoDiff(
e.spec.name,
attribute_to_object_type_str[object_type],
e,
None,
[],
TransitionType.DELETE,
)
)
for e in objects_to_keep:
diff.add_fco_diff(
FcoDiff(
e.spec.name,
attribute_to_object_type_str[object_type],
e,
e,
[],
TransitionType.UNCHANGED,
)
)

# Handle Feature Views
# Handle On Demand Feature Views
# Handle Request Feature Views
# Handle Feature Services
logger.info(f"Diff: {diff}")
return diff

def _initialize_registry(self):
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/registry_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ def teardown(self):
Tear down the registry.
"""
pass


class NoopRegistryStore(RegistryStore):
def get_registry_proto(self) -> RegistryProto:
pass

def update_registry_proto(self, registry_proto: RegistryProto):
pass

def teardown(self):
pass
Loading

0 comments on commit ce243a4

Please sign in to comment.