Skip to content

Commit

Permalink
Use set when parsing repos to prevent duplicates (#1913)
Browse files Browse the repository at this point in the history
* Use set when parsing repos to prevent duplicates

Signed-off-by: Achal Shah <[email protected]>

* Undo #1905 changes

Signed-off-by: Achal Shah <[email protected]>

* use id() in addition to name to differentiate different objects with the same name

Signed-off-by: Achal Shah <[email protected]>

* add a test

Signed-off-by: Achal Shah <[email protected]>

* add one more test

Signed-off-by: Achal Shah <[email protected]>

* remove debugging

Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals authored Sep 30, 2021
1 parent ce5a130 commit 5d0f37b
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 85 deletions.
7 changes: 2 additions & 5 deletions sdk/python/feast/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from datetime import datetime
from typing import Dict, Optional

import yaml
from google.protobuf import json_format
from google.protobuf.json_format import MessageToDict, MessageToJson

from feast.importer import get_calling_file_name
from feast.loaders import yaml as feast_yaml
from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto
from feast.protos.feast.core.Entity_pb2 import EntityMeta as EntityMetaProto
Expand Down Expand Up @@ -50,8 +48,6 @@ class Entity:
_created_timestamp: Optional[datetime]
_last_updated_timestamp: Optional[datetime]

defined_in: str

@log_exceptions
def __init__(
self,
Expand All @@ -78,7 +74,8 @@ def __init__(
self._created_timestamp: Optional[datetime] = None
self._last_updated_timestamp: Optional[datetime] = None

self.defined_in = get_calling_file_name(inspect.stack())
def __hash__(self) -> int:
return hash((id(self), self.name))

def __eq__(self, other):
if not isinstance(other, Entity):
Expand Down
8 changes: 1 addition & 7 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import inspect
from datetime import datetime
from typing import Dict, List, Optional, Union

Expand All @@ -7,7 +6,6 @@
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
Expand Down Expand Up @@ -39,8 +37,6 @@ class FeatureService:
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None

defined_in: str

@log_exceptions
def __init__(
self,
Expand Down Expand Up @@ -75,8 +71,6 @@ def __init__(
self.created_timestamp = None
self.last_updated_timestamp = None

self.defined_in = get_calling_file_name(inspect.stack())

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand All @@ -85,7 +79,7 @@ def __str__(self):
return str(MessageToJson(self.to_proto()))

def __hash__(self):
return hash(self.name)
return hash((id(self), self.name))

def __eq__(self, other):
if not isinstance(other, FeatureService):
Expand Down
7 changes: 3 additions & 4 deletions sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from typing import Dict, List, MutableMapping, Optional, Union

import yaml
Expand All @@ -22,7 +21,6 @@

from feast.data_source import DataSource, KafkaSource, KinesisSource
from feast.feature import Feature
from feast.importer import get_calling_file_name
from feast.loaders import yaml as feast_yaml
from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto
from feast.protos.feast.core.FeatureTable_pb2 import (
Expand Down Expand Up @@ -67,11 +65,12 @@ def __init__(
self._created_timestamp: Optional[Timestamp] = None
self._last_updated_timestamp: Optional[Timestamp] = None

self.defined_in = get_calling_file_name(inspect.stack())

def __str__(self):
return str(MessageToJson(self.to_proto()))

def __hash__(self) -> int:
return hash((id(self), self.name))

def __eq__(self, other):
if not isinstance(other, FeatureTable):
raise TypeError(
Expand Down
8 changes: 1 addition & 7 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import re
import warnings
from datetime import datetime, timedelta
Expand All @@ -25,7 +24,6 @@
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewMeta as FeatureViewMetaProto,
Expand Down Expand Up @@ -81,8 +79,6 @@ class FeatureView:
last_updated_timestamp: Optional[datetime] = None
materialization_intervals: List[Tuple[datetime, datetime]]

defined_in: str

@log_exceptions
def __init__(
self,
Expand Down Expand Up @@ -145,8 +141,6 @@ def __init__(
self.created_timestamp: Optional[datetime] = None
self.last_updated_timestamp: Optional[datetime] = None

self.defined_in = get_calling_file_name(inspect.stack())

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand All @@ -155,7 +149,7 @@ def __str__(self):
return str(MessageToJson(self.to_proto()))

def __hash__(self):
return hash(self.name)
return hash((id(self), self.name))

def __getitem__(self, item) -> FeatureViewProjection:
assert isinstance(item, list)
Expand Down
8 changes: 0 additions & 8 deletions sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@
from feast import errors


def get_calling_file_name(stack) -> str:
# Get two levels up from current, to ignore usage.py
previous_stack_frame = stack[1]
if "feast/usage.py" in previous_stack_frame.filename:
previous_stack_frame = stack[2]
return previous_stack_frame.filename


def get_class_from_type(module_name: str, class_name: str, class_type: str):
if not class_name.endswith(class_type):
raise errors.FeastClassInvalidName(class_name, class_type)
Expand Down
7 changes: 2 additions & 5 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import inspect
from types import MethodType
from typing import Dict, List, Union, cast

Expand All @@ -12,7 +11,6 @@
from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand Down Expand Up @@ -48,8 +46,6 @@ class OnDemandFeatureView:
inputs: Dict[str, Union[FeatureView, RequestDataSource]]
udf: MethodType

defined_in: str

@log_exceptions
def __init__(
self,
Expand All @@ -67,7 +63,8 @@ def __init__(
self.inputs = inputs
self.udf = udf

self.defined_in = get_calling_file_name(inspect.stack())
def __hash__(self) -> int:
return hash((id(self), self.name))

def to_proto(self) -> OnDemandFeatureViewProto:
"""
Expand Down
86 changes: 38 additions & 48 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def py_path_to_module(path: Path, repo_root: Path) -> str:


class ParsedRepo(NamedTuple):
feature_tables: List[FeatureTable]
feature_views: List[FeatureView]
on_demand_feature_views: List[OnDemandFeatureView]
entities: List[Entity]
feature_services: List[FeatureService]
feature_tables: Set[FeatureTable]
feature_views: Set[FeatureView]
on_demand_feature_views: Set[OnDemandFeatureView]
entities: Set[Entity]
feature_services: Set[FeatureService]


def read_feastignore(repo_root: Path) -> List[str]:
Expand Down Expand Up @@ -94,11 +94,11 @@ def get_repo_files(repo_root: Path) -> List[Path]:
def parse_repo(repo_root: Path) -> ParsedRepo:
""" Collect feature table definitions from feature repo """
res = ParsedRepo(
feature_tables=[],
entities=[],
feature_views=[],
feature_services=[],
on_demand_feature_views=[],
feature_tables=set(),
entities=set(),
feature_views=set(),
feature_services=set(),
on_demand_feature_views=set(),
)

for repo_file in get_repo_files(repo_root):
Expand All @@ -107,25 +107,15 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
for attr_name in dir(module):
obj = getattr(module, attr_name)
if isinstance(obj, FeatureTable):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_tables.append(obj)
res.feature_tables.add(obj)
if isinstance(obj, FeatureView):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_views.append(obj)
res.feature_views.add(obj)
elif isinstance(obj, Entity):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.entities.append(obj)
res.entities.add(obj)
elif isinstance(obj, FeatureService):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_services.append(obj)
res.feature_services.add(obj)
elif isinstance(obj, OnDemandFeatureView):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.on_demand_feature_views.append(obj)
res.on_demand_feature_views.add(obj)
return res


Expand All @@ -146,7 +136,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
_validate_feature_views(repo.feature_views)
_validate_feature_views(list(repo.feature_views))

if not skip_source_validation:
data_sources = [t.batch_source for t in repo.feature_views]
Expand Down Expand Up @@ -259,8 +249,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
project,
tables_to_delete=all_to_delete,
tables_to_keep=all_to_keep,
entities_to_delete=entities_to_delete,
entities_to_keep=entities_to_keep,
entities_to_delete=list(entities_to_delete),
entities_to_keep=list(entities_to_keep),
partial=False,
)

Expand All @@ -270,63 +260,63 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation

def _tag_registry_entities_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[Entity], List[Entity]]:
entities_to_keep: List[Entity] = repo.entities
entities_to_delete: List[Entity] = []
) -> Tuple[Set[Entity], Set[Entity]]:
entities_to_keep: Set[Entity] = repo.entities
entities_to_delete: Set[Entity] = set()
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)
entities_to_delete.add(registry_entity)
return entities_to_keep, entities_to_delete


def _tag_registry_views_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureView], List[FeatureView]]:
views_to_keep: List[FeatureView] = repo.feature_views
views_to_delete: List[FeatureView] = []
) -> Tuple[Set[FeatureView], Set[FeatureView]]:
views_to_keep: Set[FeatureView] = repo.feature_views
views_to_delete: Set[FeatureView] = set()
repo_feature_view_names = set(t.name for t in repo.feature_views)
for registry_view in registry.list_feature_views(project=project):
if registry_view.name not in repo_feature_view_names:
views_to_delete.append(registry_view)
views_to_delete.add(registry_view)
return views_to_keep, views_to_delete


def _tag_registry_on_demand_feature_views_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[OnDemandFeatureView], List[OnDemandFeatureView]]:
odfvs_to_keep: List[OnDemandFeatureView] = repo.on_demand_feature_views
odfvs_to_delete: List[OnDemandFeatureView] = []
) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]:
odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views
odfvs_to_delete: Set[OnDemandFeatureView] = set()
repo_on_demand_feature_view_names = set(
t.name for t in repo.on_demand_feature_views
)
for registry_odfv in registry.list_on_demand_feature_views(project=project):
if registry_odfv.name not in repo_on_demand_feature_view_names:
odfvs_to_delete.append(registry_odfv)
odfvs_to_delete.add(registry_odfv)
return odfvs_to_keep, odfvs_to_delete


def _tag_registry_tables_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureTable], List[FeatureTable]]:
tables_to_keep: List[FeatureTable] = repo.feature_tables
tables_to_delete: List[FeatureTable] = []
) -> Tuple[Set[FeatureTable], Set[FeatureTable]]:
tables_to_keep: Set[FeatureTable] = repo.feature_tables
tables_to_delete: Set[FeatureTable] = set()
repo_table_names = set(t.name for t in repo.feature_tables)
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
tables_to_delete.append(registry_table)
tables_to_delete.add(registry_table)
return tables_to_keep, tables_to_delete


def _tag_registry_services_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureService], List[FeatureService]]:
services_to_keep: List[FeatureService] = repo.feature_services
services_to_delete: List[FeatureService] = []
) -> Tuple[Set[FeatureService], Set[FeatureService]]:
services_to_keep: Set[FeatureService] = repo.feature_services
services_to_delete: Set[FeatureService] = set()
repo_feature_service_names = set(t.name for t in repo.feature_services)
for registry_service in registry.list_feature_services(project=project):
if registry_service.name not in repo_feature_service_names:
services_to_delete.append(registry_service)
services_to_delete.add(registry_service)
return services_to_keep, services_to_delete


Expand Down
Loading

0 comments on commit 5d0f37b

Please sign in to comment.