Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate update infra #1990

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 86 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,29 @@ def apply(
OnDemandFeatureView,
RequestFeatureView,
FeatureService,
FeatureTable,
List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
]
],
],
objects_to_delete: List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
]
] = [],
partial: bool = True,
commit: bool = True,
):
"""Register objects to metadata store and update related infrastructure.
Expand All @@ -389,6 +402,10 @@ def apply(

Args:
objects: A single object, or a list of objects that should be registered with the Feature Store.
objects_to_delete: A list of objects to be deleted from the registry and removed from the
provider's infrastructure. This deletion will only be performed if partial is set to False.
partial: If True, apply will only handle the specified objects; if False, apply will also delete
all the objects in objects_to_delete, and tear down any associated cloud resources.
commit: whether to commit changes to the registry

Raises:
Expand Down Expand Up @@ -421,11 +438,26 @@ def apply(

assert isinstance(objects, list)

# Separate all objects into entities, feature services, and different feature view types.
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
request_views_to_update = [
ob for ob in objects if isinstance(ob, RequestFeatureView)
]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)]

if len(entities_to_update) + len(views_to_update) + len(
request_views_to_update
) + len(odfvs_to_update) + len(services_to_update) + len(
tables_to_update
) != len(
objects
):
raise ValueError("Unknown object type provided as part of apply() call")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a nicer error message here if we could point to which object was the unknown one.


# Validate all types of feature views.
if (
not flags_helper.enable_on_demand_feature_views(self.config)
and len(odfvs_to_update) > 0
Expand All @@ -438,8 +470,6 @@ def apply(
_validate_feature_views(
[*views_to_update, *odfvs_to_update, *request_views_to_update]
)
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]

# Make inferences
update_entities_with_inferred_types_from_feature_views(
Expand All @@ -456,19 +486,15 @@ def apply(
for odfv in odfvs_to_update:
odfv.infer_features()

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
) + len(odfvs_to_update) + len(request_views_to_update) != len(objects):
raise ValueError("Unknown object type provided as part of apply() call")

# DUMMY_ENTITY is a placeholder entity used in entityless FeatureViews
# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME,
join_key=DUMMY_ENTITY_ID,
value_type=ValueType.INT32,
)
entities_to_update.append(DUMMY_ENTITY)

# Add all objects to the registry and update the provider's infrastructure.
for view in itertools.chain(
views_to_update, odfvs_to_update, request_views_to_update
):
Expand All @@ -477,14 +503,62 @@ def apply(
self._registry.apply_entity(ent, project=self.project, commit=False)
for feature_service in services_to_update:
self._registry.apply_feature_service(feature_service, project=self.project)
for table in tables_to_update:
self._registry.apply_feature_table(table, project=self.project)

if not partial:
# Delete all registry objects that should not exist.
entities_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, Entity)
]
views_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, FeatureView)
]
request_views_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, RequestFeatureView)
]
odfvs_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, OnDemandFeatureView)
]
services_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, FeatureService)
]
tables_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, FeatureTable)
]

for entity in entities_to_delete:
self._registry.delete_entity(
entity.name, project=self.project, commit=False
)
for view in views_to_delete:
self._registry.delete_feature_view(
view.name, project=self.project, commit=False
)
for request_view in request_views_to_delete:
self._registry.delete_feature_view(
request_view.name, project=self.project, commit=False
)
for odfv in odfvs_to_delete:
self._registry.delete_feature_view(
odfv.name, project=self.project, commit=False
)
for service in services_to_delete:
self._registry.delete_feature_service(
service.name, project=self.project, commit=False
)
for table in tables_to_delete:
self._registry.delete_feature_table(
table.name, project=self.project, commit=False
)

self._get_provider().update_infra(
project=self.project,
tables_to_delete=[],
tables_to_keep=views_to_update,
entities_to_delete=[],
tables_to_delete=views_to_delete + tables_to_delete if not partial else [],
tables_to_keep=views_to_update + tables_to_update,
entities_to_delete=entities_to_delete if not partial else [],
entities_to_keep=entities_to_update,
partial=True,
partial=partial,
)

if commit:
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,32 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True):

raise FeatureViewNotFoundException(name, project)

def delete_entity(self, name: str, project: str, commit: bool = True):
"""
Deletes an entity or raises an exception if not found.

Args:
name: Name of entity
project: Feast project that this entity belongs to
commit: Whether the change should be persisted immediately
"""
self._prepare_registry_for_changes()
assert self.cached_registry_proto

for idx, existing_entity_proto in enumerate(
self.cached_registry_proto.entities
):
if (
existing_entity_proto.spec.name == name
and existing_entity_proto.spec.project == project
):
del self.cached_registry_proto.entities[idx]
if commit:
self.commit()
return

raise EntityNotFoundException(name, project)

def commit(self):
"""Commits the state of the registry cache to the remote registry store."""
if self.cached_registry_proto:
Expand Down
98 changes: 42 additions & 56 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
from feast import Entity, FeatureTable
from feast.base_feature_view import BaseFeatureView
from feast.feature_service import FeatureService
from feast.feature_store import FeatureStore, _validate_feature_views
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.infra.provider import get_provider
from feast.names import adjectives, animals
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
Expand Down Expand Up @@ -142,20 +141,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
_validate_feature_views(
[
*list(repo.feature_views),
*list(repo.on_demand_feature_views),
*list(repo.request_feature_views),
]
)

if not skip_source_validation:
data_sources = [t.batch_source for t in repo.feature_views]
# Make sure the data source used by this feature view is supported by Feast
for data_source in data_sources:
data_source.validate(store.config)

# For each object in the registry, determine whether it should be kept or deleted.
entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete(
project, registry, repo
)
Expand All @@ -169,50 +162,60 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete(
project, registry, repo
)
(services_to_keep, services_to_delete,) = _tag_registry_services_for_keep_delete(
services_to_keep, services_to_delete = _tag_registry_services_for_keep_delete(
project, registry, repo
)

sys.dont_write_bytecode = False

# Delete views that should not exist
for registry_view in views_to_delete:
registry.delete_feature_view(registry_view.name, project=project, commit=False)
# Apply all changes to the registry and infrastructure.
all_to_apply: List[
Union[
Entity, BaseFeatureView, FeatureService, OnDemandFeatureView, FeatureTable
]
] = []
all_to_apply.extend(entities_to_keep)
all_to_apply.extend(views_to_keep)
all_to_apply.extend(services_to_keep)
all_to_apply.extend(odfvs_to_keep)
all_to_apply.extend(tables_to_keep)
all_to_delete: List[
Union[
Entity, BaseFeatureView, FeatureService, OnDemandFeatureView, FeatureTable
]
] = []
all_to_delete.extend(entities_to_delete)
all_to_delete.extend(views_to_delete)
all_to_delete.extend(services_to_delete)
all_to_delete.extend(odfvs_to_delete)
all_to_delete.extend(tables_to_delete)

store.apply(
all_to_apply, objects_to_delete=all_to_delete, partial=False, commit=False
)

for entity in entities_to_delete:
click.echo(
f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry"
f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry"
)

# Delete feature services that should not exist
for feature_service_to_delete in services_to_delete:
registry.delete_feature_service(
feature_service_to_delete.name, project=project, commit=False
for view in views_to_delete:
click.echo(
f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry"
)
for odfv in odfvs_to_delete:
click.echo(
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} "
f"from registry"
f"Deleted on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL} from registry"
)

# Delete tables that should not exist
for registry_table in tables_to_delete:
registry.delete_feature_table(
registry_table.name, project=project, commit=False
for table in tables_to_delete:
click.echo(
f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL} from registry"
)
for feature_service in services_to_delete:
click.echo(
f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{registry_table.name}{Style.RESET_ALL} from registry"
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL} "
f"from registry"
)

# TODO: delete entities from the registry too

# Add / update views + entities + services
all_to_apply: List[
Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView]
] = []
all_to_apply.extend(entities_to_keep)
all_to_apply.extend(views_to_keep)
all_to_apply.extend(services_to_keep)
all_to_apply.extend(odfvs_to_keep)
# TODO: delete odfvs
store.apply(all_to_apply, commit=False)
for entity in entities_to_keep:
click.echo(
f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}"
Expand All @@ -231,12 +234,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
)
# Create tables that should exist
for table in tables_to_keep:
registry.apply_feature_table(table, project, commit=False)
click.echo(
f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}"
)

infra_provider = get_provider(repo_config, repo_path)
views_to_keep_in_infra = [
view for view in views_to_keep if isinstance(view, FeatureView)
]
Expand All @@ -257,21 +258,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
)
# TODO: consider echoing also entities being deployed/removed

all_to_delete: List[Union[FeatureTable, FeatureView]] = []
all_to_delete.extend(tables_to_delete)
all_to_delete.extend(views_to_delete_from_infra)
all_to_keep: List[Union[FeatureTable, FeatureView]] = []
all_to_keep.extend(tables_to_keep)
all_to_keep.extend(views_to_keep_in_infra)
infra_provider.update_infra(
project,
tables_to_delete=all_to_delete,
tables_to_keep=all_to_keep,
entities_to_delete=list(entities_to_delete),
entities_to_keep=list(entities_to_keep),
partial=False,
)

# Commit the update to the registry only after successful infra update
registry.commit()

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def test_apply_entity_success(test_registry):
and entity.labels["team"] == "matchmaking"
)

test_registry.delete_entity("driver_car_id", project)
entities = test_registry.list_entities(project)
assert len(entities) == 0

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
Expand Down