diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index a720865ed4..313bb142b3 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -170,7 +170,7 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration --durations=5 + run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index ea6141f331..9d6fc92aa2 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -66,7 +66,7 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: FEAST_USAGE=False pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests + run: FEAST_USAGE=False pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 119b86d6c1..7824db4a39 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -646,7 +646,8 @@ def _plan( self._registry.refresh() current_infra_proto = ( self._registry.cached_registry_proto.infra.__deepcopy__() - if self._registry.cached_registry_proto + if hasattr(self._registry, "cached_registry_proto") + and self._registry.cached_registry_proto else InfraProto() ) desired_registry_proto = desired_repo_contents.to_registry_proto() diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry_stores/sql.py index f13428b737..1a45dec68a 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry_stores/sql.py @@ -48,10 +48,11 @@ from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) -from feast.registry import Registry +from feast.registry import BaseRegistry from feast.repo_config import RegistryConfig from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView metadata = MetaData() @@ -121,7 +122,7 @@ ) -class SqlRegistry(Registry): +class SqlRegistry(BaseRegistry): def __init__( self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] ): @@ -152,6 +153,11 @@ def teardown(self): def refresh(self): pass + def list_stream_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[StreamFeatureView]: + return [] + def apply_entity(self, entity: Entity, project: str, commit: bool = True): return self._apply_object(entities, "entity_name", entity, "entity_proto") @@ -389,6 +395,22 @@ def apply_validation_reference( "validation_reference_proto", ) + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + pass + + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + pass + + def commit(self): + pass + def _apply_object( self, table, id_field_name, obj, proto_field_name, ): diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index dc52013bff..fe37aa8dc2 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import json import logging +from abc import abstractmethod from collections import defaultdict from datetime import datetime, timedelta from enum import Enum @@ -145,7 +147,444 @@ def get_registry_store_class_from_scheme(registry_path: str): return get_registry_store_class_from_type(registry_store_type) -class Registry: +class BaseRegistry(abc.ABC): + # Entity operations + @abstractmethod + def apply_entity(self, entity: Entity, project: str, commit: bool = True): + """ + Registers a single entity with Feast + + Args: + entity: Entity that will be registered + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_entity(self, name: str, project: str, commit: bool = True): + """ + Deletes an entity or raises an exception if not found. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + """ + Retrieves an entity. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + allow_cache: Whether to allow returning this entity from a cached registry + + Returns: + Returns either the specified entity, or raises an exception if + none is found + """ + + @abstractmethod + def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + """ + Retrieve a list of entities from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of entities + """ + + # Data source operations + @abstractmethod + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + """ + Registers a single data source with Feast + + Args: + data_source: A data source that will be registered + project: Feast project that this data source belongs to + commit: Whether to immediately commit to the registry + """ + + @abstractmethod + def delete_data_source(self, name: str, project: str, commit: bool = True): + """ + Deletes a data source or raises an exception if not found. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + """ + Retrieves a data source. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + allow_cache: Whether to allow returning this data source from a cached registry + + Returns: + Returns either the specified data source, or raises an exception if none is found + """ + + @abstractmethod + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + """ + Retrieve a list of data sources from the registry + + Args: + project: Filter data source based on project name + allow_cache: Whether to allow returning data sources from a cached registry + + Returns: + List of data sources + """ + + # Feature service operations + @abstractmethod + def apply_feature_service( + self, feature_service: FeatureService, project: str, commit: bool = True + ): + """ + Registers a single feature service with Feast + + Args: + feature_service: A feature service that will be registered + project: Feast project that this entity belongs to + """ + + @abstractmethod + def delete_feature_service(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature service or raises an exception if not found. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_feature_service( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureService: + """ + Retrieves a feature service. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + allow_cache: Whether to allow returning this feature service from a cached registry + + Returns: + Returns either the specified feature service, or raises an exception if + none is found + """ + + @abstractmethod + def list_feature_services( + self, project: str, allow_cache: bool = False + ) -> List[FeatureService]: + """ + Retrieve a list of feature services from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of feature services + """ + + # Feature view operations + @abstractmethod + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): + """ + Registers a single feature view with Feast + + Args: + feature_view: Feature view that will be registered + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_feature_view(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature view or raises an exception if not found. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + + # stream feature view operations + # TODO: Needs to be implemented. + # def get_stream_feature_view(self): + # ... + + @abstractmethod + def list_stream_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[StreamFeatureView]: + """ + Retrieve a list of stream feature views from the registry + + Args: + project: Filter stream feature views based on project name + allow_cache: Whether to allow returning stream feature views from a cached registry + + Returns: + List of stream feature views + """ + + # on demand feature view operations + @abstractmethod + def get_on_demand_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> OnDemandFeatureView: + """ + Retrieves an on demand feature view. + + Args: + name: Name of on demand feature view + project: Feast project that this on demand feature view belongs to + allow_cache: Whether to allow returning this on demand feature view from a cached registry + + Returns: + Returns either the specified on demand feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[OnDemandFeatureView]: + """ + Retrieve a list of on demand feature views from the registry + + Args: + project: Filter on demand feature views based on project name + allow_cache: Whether to allow returning on demand feature views from a cached registry + + Returns: + List of on demand feature views + """ + + # regular feature view operations + @abstractmethod + def get_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureView: + """ + Retrieves a feature view. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: + """ + Retrieve a list of feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of feature views + """ + + # request feature view operations + # TODO: Needs to be implemented. + # @abstractmethod + # def get_request_feature_view(self, name: str, project: str): + # ... + + @abstractmethod + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + """ + Retrieve a list of request feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of request feature views + """ + + @abstractmethod + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + """ + Updates materialization intervals tracked for a single feature view in Feast + + Args: + feature_view: Feature view that will be updated with an additional materialization interval tracked + project: Feast project that this feature view belongs to + start_date (datetime): Start date of the materialization interval to track + end_date (datetime): End date of the materialization interval to track + commit: Whether the change should be persisted immediately + """ + + # Saved dataset operations + @abstractmethod + def apply_saved_dataset( + self, saved_dataset: SavedDataset, project: str, commit: bool = True, + ): + """ + Stores a saved dataset metadata with Feast + + Args: + saved_dataset: SavedDataset that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_saved_dataset( + self, name: str, project: str, allow_cache: bool = False + ) -> SavedDataset: + """ + Retrieves a saved dataset. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified SavedDataset, or raises an exception if + none is found + """ + + # TODO: Needs to be implemented. + # def delete_saved_dataset( + # self, name: str, project: str, allow_cache: bool = False + # ): + # """ + # Retrieves a saved dataset. + # + # Args: + # name: Name of dataset + # project: Feast project that this dataset belongs to + # allow_cache: Whether to allow returning this dataset from a cached registry + # + # Returns: + # Returns either the specified SavedDataset, or raises an exception if + # none is found + # """ + + @abstractmethod + def list_saved_datasets( + self, project: str, allow_cache: bool = False + ) -> List[SavedDataset]: + """ + Retrieves a list of all saved datasets in specified project + + Args: + project: Feast project + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns the list of SavedDatasets + """ + + # Validation reference operations + @abstractmethod + def apply_validation_reference( + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, + ): + """ + Persist a validation reference + + Args: + validation_reference: ValidationReference that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + """ + Deletes a validation reference or raises an exception if not found. + + Args: + name: Name of validation reference + project: Feast project that this object belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_validation_reference( + self, name: str, project: str, allow_cache: bool = False + ) -> ValidationReference: + """ + Retrieves a validation reference. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified ValidationReference, or raises an exception if + none is found + """ + + # TODO: Needs to be implemented. + # def list_validation_references(self): + # ... + + @abstractmethod + def commit(self): + """Commits the state of the registry cache to the remote registry store.""" + + @abstractmethod + def refresh(self): + """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" + + +class Registry(BaseRegistry): """ Registry: A registry allows for the management and persistence of feature definitions and related metadata. """