diff --git a/docs/how-to-guides/customizing-feast/creating-a-custom-provider.md b/docs/how-to-guides/customizing-feast/creating-a-custom-provider.md index 027ca20c39..f2bc3f8327 100644 --- a/docs/how-to-guides/customizing-feast/creating-a-custom-provider.md +++ b/docs/how-to-guides/customizing-feast/creating-a-custom-provider.md @@ -37,7 +37,7 @@ from feast.infra.local import LocalProvider from feast.infra.offline_stores.offline_store import RetrievalJob from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry +from feast.infra.registry.registry import Registry from feast.repo_config import RepoConfig diff --git a/go/internal/feast/registry/local.go b/go/internal/feast/registry/local.go index 8b35e5756b..124fcba3ed 100644 --- a/go/internal/feast/registry/local.go +++ b/go/internal/feast/registry/local.go @@ -12,15 +12,15 @@ import ( "github.com/feast-dev/feast/go/protos/feast/core" ) -// A LocalRegistryStore is a file-based implementation of the RegistryStore interface. -type LocalRegistryStore struct { +// A FileRegistryStore is a file-based implementation of the RegistryStore interface. +type FileRegistryStore struct { filePath string } -// NewLocalRegistryStore creates a LocalRegistryStore with the given configuration and infers +// NewFileRegistryStore creates a FileRegistryStore with the given configuration and infers // the file path from the repo path and registry path. -func NewLocalRegistryStore(config *RegistryConfig, repoPath string) *LocalRegistryStore { - lr := LocalRegistryStore{} +func NewFileRegistryStore(config *RegistryConfig, repoPath string) *FileRegistryStore { + lr := FileRegistryStore{} registryPath := config.Path if filepath.IsAbs(registryPath) { lr.filePath = registryPath @@ -31,7 +31,7 @@ func NewLocalRegistryStore(config *RegistryConfig, repoPath string) *LocalRegist } // GetRegistryProto reads and parses the registry proto from the file path. -func (r *LocalRegistryStore) GetRegistryProto() (*core.Registry, error) { +func (r *FileRegistryStore) GetRegistryProto() (*core.Registry, error) { registry := &core.Registry{} in, err := ioutil.ReadFile(r.filePath) if err != nil { @@ -43,15 +43,15 @@ func (r *LocalRegistryStore) GetRegistryProto() (*core.Registry, error) { return registry, nil } -func (r *LocalRegistryStore) UpdateRegistryProto(rp *core.Registry) error { +func (r *FileRegistryStore) UpdateRegistryProto(rp *core.Registry) error { return r.writeRegistry(rp) } -func (r *LocalRegistryStore) Teardown() error { +func (r *FileRegistryStore) Teardown() error { return os.Remove(r.filePath) } -func (r *LocalRegistryStore) writeRegistry(rp *core.Registry) error { +func (r *FileRegistryStore) writeRegistry(rp *core.Registry) error { rp.VersionId = uuid.New().String() rp.LastUpdated = timestamppb.Now() bytes, err := proto.Marshal(rp) diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index c67a50a5a6..9d0684d023 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -16,8 +16,8 @@ var REGISTRY_SCHEMA_VERSION string = "1" var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{ "gs": "GCSRegistryStore", "s3": "S3RegistryStore", - "file": "LocalRegistryStore", - "": "LocalRegistryStore", + "file": "FileRegistryStore", + "": "FileRegistryStore", } /* @@ -335,8 +335,8 @@ func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryCon func getRegistryStoreFromType(registryStoreType string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { switch registryStoreType { - case "LocalRegistryStore": - return NewLocalRegistryStore(registryConfig, repoPath), nil + case "FileRegistryStore": + return NewFileRegistryStore(registryConfig, repoPath), nil } - return nil, errors.New("only LocalRegistryStore as a RegistryStore is supported at this moment") + return nil, errors.New("only FileRegistryStore as a RegistryStore is supported at this moment") } diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index b1e75332b0..58823d2fe1 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -96,6 +96,12 @@ Entity Feature View ================== +.. automodule:: feast.base_feature_view + :members: + +Feature View +---------------------- + .. automodule:: feast.feature_view :members: @@ -128,32 +134,59 @@ Feature Service Registry ================== -.. automodule:: feast.registry +.. automodule:: feast.infra.registry.base_registry + :inherited-members: + :members: + +Registry +---------------------- + +.. automodule:: feast.infra.registry.registry + :inherited-members: + :members: + +SQL Registry +---------------------- + +.. automodule:: feast.infra.registry.sql :inherited-members: :members: Registry Store ================== -.. automodule:: feast.registry_store +.. automodule:: feast.infra.registry.registry_store :inherited-members: :members: :exclude-members: NoopRegistryStore -SQL Registry Store +File Registry Store ----------------------- -.. automodule:: feast.infra.registry_stores.sql +.. automodule:: feast.infra.registry.file :members: :noindex: -PostgreSQL Registry Store +GCS Registry Store ----------------------- -.. automodule:: feast.infra.registry_stores.contrib.postgres.registry_store +.. automodule:: feast.infra.registry.gcs :members: :noindex: +S3 Registry Store +----------------------- + +.. automodule:: feast.infra.registry.s3 + :members: + :noindex: + +PostgreSQL Registry Store +----------------------- + +.. automodule:: feast.infra.registry.contrib.postgres.postgres_registry_store + :members: + :noindex: Provider ================== @@ -173,21 +206,18 @@ Local Provider .. automodule:: feast.infra.local :members: - :exclude-members: LocalRegistryStore GCP Provider ------------------ .. automodule:: feast.infra.gcp :members: - :exclude-members: GCSRegistryStore AWS Provider ------------------ .. automodule:: feast.infra.aws :members: - :exclude-members: S3RegistryStore Offline Store ================== diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.rst new file mode 100644 index 0000000000..d2275b2b39 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.rst @@ -0,0 +1,37 @@ +feast.infra.offline\_stores.contrib.athena\_offline\_store package +================================================================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + feast.infra.offline_stores.contrib.athena_offline_store.tests + +Submodules +---------- + +feast.infra.offline\_stores.contrib.athena\_offline\_store.athena module +------------------------------------------------------------------------ + +.. automodule:: feast.infra.offline_stores.contrib.athena_offline_store.athena + :members: + :undoc-members: + :show-inheritance: + +feast.infra.offline\_stores.contrib.athena\_offline\_store.athena\_source module +-------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.athena_offline_store.athena_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.athena_offline_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.tests.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.tests.rst new file mode 100644 index 0000000000..47a8f83e2b --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.athena_offline_store.tests.rst @@ -0,0 +1,21 @@ +feast.infra.offline\_stores.contrib.athena\_offline\_store.tests package +======================================================================== + +Submodules +---------- + +feast.infra.offline\_stores.contrib.athena\_offline\_store.tests.data\_source module +------------------------------------------------------------------------------------ + +.. automodule:: feast.infra.offline_stores.contrib.athena_offline_store.tests.data_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.athena_offline_store.tests + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst index 80916046b5..5d1fbb3e0b 100644 --- a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst @@ -7,6 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 + feast.infra.offline_stores.contrib.athena_offline_store feast.infra.offline_stores.contrib.postgres_offline_store feast.infra.offline_stores.contrib.spark_offline_store feast.infra.offline_stores.contrib.trino_offline_store @@ -14,6 +15,14 @@ Subpackages Submodules ---------- +feast.infra.offline\_stores.contrib.athena\_repo\_configuration module +---------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.athena_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.offline\_stores.contrib.postgres\_repo\_configuration module ------------------------------------------------------------------------ diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index ca07b356d6..6afe9071ac 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -37,6 +37,14 @@ feast.infra.online\_stores.contrib.postgres module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.postgres\_repo\_configuration module +----------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.postgres_repo_configuration + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/docs/source/feast.infra.registry.rst b/sdk/python/docs/source/feast.infra.registry.rst new file mode 100644 index 0000000000..7a2d968997 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.registry.rst @@ -0,0 +1,69 @@ +feast.infra.registry package +============================ + +Submodules +---------- + +feast.infra.registry.base\_registry module +------------------------------------------ + +.. automodule:: feast.infra.registry.base_registry + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.file module +-------------------------------- + +.. automodule:: feast.infra.registry.file + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.gcs module +------------------------------- + +.. automodule:: feast.infra.registry.gcs + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.registry module +------------------------------------ + +.. automodule:: feast.infra.registry.registry + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.registry\_store module +------------------------------------------- + +.. automodule:: feast.infra.registry.registry_store + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.s3 module +------------------------------ + +.. automodule:: feast.infra.registry.s3 + :members: + :undoc-members: + :show-inheritance: + +feast.infra.registry.sql module +------------------------------- + +.. automodule:: feast.infra.registry.sql + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.registry + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.rst b/sdk/python/docs/source/feast.infra.rst index 42c7d1334b..96c0828451 100644 --- a/sdk/python/docs/source/feast.infra.rst +++ b/sdk/python/docs/source/feast.infra.rst @@ -10,7 +10,7 @@ Subpackages feast.infra.materialization feast.infra.offline_stores feast.infra.online_stores - feast.infra.registry_stores + feast.infra.registry feast.infra.utils Submodules diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index 11fbde83e5..b0ed92c4cc 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -241,22 +241,6 @@ feast.proto\_json module :undoc-members: :show-inheritance: -feast.registry module ---------------------- - -.. automodule:: feast.registry - :members: - :undoc-members: - :show-inheritance: - -feast.registry\_store module ----------------------------- - -.. automodule:: feast.registry_store - :members: - :undoc-members: - :show-inheritance: - feast.repo\_config module ------------------------- diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index acee151d3e..8c54184285 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -96,6 +96,12 @@ Entity Feature View ================== +.. automodule:: feast.base_feature_view + :members: + +Feature View +---------------------- + .. automodule:: feast.feature_view :members: @@ -128,32 +134,59 @@ Feature Service Registry ================== -.. automodule:: feast.registry +.. automodule:: feast.infra.registry.base_registry + :inherited-members: + :members: + +Registry +---------------------- + +.. automodule:: feast.infra.registry.registry + :inherited-members: + :members: + +SQL Registry +---------------------- + +.. automodule:: feast.infra.registry.sql :inherited-members: :members: Registry Store ================== -.. automodule:: feast.registry_store +.. automodule:: feast.infra.registry.registry_store :inherited-members: :members: :exclude-members: NoopRegistryStore -SQL Registry Store +File Registry Store ----------------------- -.. automodule:: feast.infra.registry_stores.sql +.. automodule:: feast.infra.registry.file :members: :noindex: -PostgreSQL Registry Store +GCS Registry Store ----------------------- -.. automodule:: feast.infra.registry_stores.contrib.postgres.registry_store +.. automodule:: feast.infra.registry.gcs :members: :noindex: +S3 Registry Store +----------------------- + +.. automodule:: feast.infra.registry.s3 + :members: + :noindex: + +PostgreSQL Registry Store +----------------------- + +.. automodule:: feast.infra.registry.contrib.postgres.postgres_registry_store + :members: + :noindex: Provider ================== @@ -173,21 +206,18 @@ Local Provider .. automodule:: feast.infra.local :members: - :exclude-members: LocalRegistryStore GCP Provider ------------------ .. automodule:: feast.infra.gcp :members: - :exclude-members: GCSRegistryStore AWS Provider ------------------ .. automodule:: feast.infra.aws :members: - :exclude-members: S3RegistryStore Offline Store ================== diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 37c8af9155..15f880e392 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -8,6 +8,8 @@ from feast.feast_object import FeastObject, FeastObjectSpecProto from feast.feature_service import FeatureService from feast.feature_view import DUMMY_ENTITY_NAME +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureService_pb2 import ( @@ -27,7 +29,6 @@ from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) -from feast.registry import FEAST_OBJECT_TYPES, BaseRegistry, FeastObjectType from feast.repo_contents import RepoContents diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index da9a0c9fe5..bd45c09b0a 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -18,7 +18,7 @@ if TYPE_CHECKING: from feast.feature_service import FeatureService - from feast.registry import BaseRegistry + from feast.infra.registry.base_registry import BaseRegistry REQUEST_ID_FIELD = "__request_id" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ca9ce1e120..76684bf18a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -77,7 +77,9 @@ ) from feast.infra.infra_object import Infra from feast.infra.provider import Provider, RetrievalJob, get_provider -from feast.infra.registry_stores.sql import SqlRegistry +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import Registry +from feast.infra.registry.sql import SqlRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.serving.ServingService_pb2 import ( @@ -86,7 +88,6 @@ ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value -from feast.registry import BaseRegistry, Registry from feast.repo_config import RepoConfig, load_repo_config from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 523848f2f2..f334998e6b 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -4,11 +4,7 @@ import os import uuid import warnings -from datetime import datetime -from pathlib import Path -from tempfile import TemporaryFile from typing import Optional, Sequence -from urllib.parse import urlparse from colorama import Fore, Style @@ -25,17 +21,13 @@ AwsLambdaDoesNotExist, IncompatibleRegistryStoreClass, RepoConfigPathDoesNotExist, - S3RegistryBucketForbiddenAccess, - S3RegistryBucketNotExist, ) from feast.feature_view import FeatureView from feast.infra.feature_servers.aws_lambda.config import AwsLambdaFeatureServerConfig from feast.infra.passthrough_provider import PassthroughProvider +from feast.infra.registry.registry import get_registry_store_class_from_scheme +from feast.infra.registry.s3 import S3RegistryStore from feast.infra.utils import aws_utils -from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry import get_registry_store_class_from_scheme -from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig from feast.usage import log_exceptions_and_usage from feast.version import get_version @@ -362,64 +354,3 @@ def _get_docker_image_version() -> str: "> pip install -e '.'" ) return version - - -class S3RegistryStore(RegistryStore): - def __init__(self, registry_config: RegistryConfig, repo_path: Path): - uri = registry_config.path - self._uri = urlparse(uri) - self._bucket = self._uri.hostname - self._key = self._uri.path.lstrip("/") - - self.s3_client = boto3.resource( - "s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL") - ) - - @log_exceptions_and_usage(registry="s3") - def get_registry_proto(self): - file_obj = TemporaryFile() - registry_proto = RegistryProto() - try: - from botocore.exceptions import ClientError - except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("aws", str(e)) - try: - bucket = self.s3_client.Bucket(self._bucket) - self.s3_client.meta.client.head_bucket(Bucket=bucket.name) - except ClientError as e: - # If a client error is thrown, then check that it was a 404 error. - # If it was a 404 error, then the bucket does not exist. - error_code = int(e.response["Error"]["Code"]) - if error_code == 404: - raise S3RegistryBucketNotExist(self._bucket) - else: - raise S3RegistryBucketForbiddenAccess(self._bucket) from e - - try: - obj = bucket.Object(self._key) - obj.download_fileobj(file_obj) - file_obj.seek(0) - registry_proto.ParseFromString(file_obj.read()) - return registry_proto - except ClientError as e: - raise FileNotFoundError( - f"Error while trying to locate Registry at path {self._uri.geturl()}" - ) from e - - @log_exceptions_and_usage(registry="s3") - def update_registry_proto(self, registry_proto: RegistryProto): - self._write_registry(registry_proto) - - def teardown(self): - self.s3_client.Object(self._bucket, self._key).delete() - - def _write_registry(self, registry_proto: RegistryProto): - registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) - # we have already checked the bucket exists so no need to do it again - file_obj = TemporaryFile() - file_obj.write(registry_proto.SerializeToString()) - file_obj.seek(0) - self.s3_client.Bucket(self._bucket).put_object(Body=file_obj, Key=self._key) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 257ae38d02..512378237a 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,14 +1,4 @@ -import uuid -from datetime import datetime -from pathlib import Path -from tempfile import TemporaryFile -from urllib.parse import urlparse - from feast.infra.passthrough_provider import PassthroughProvider -from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig -from feast.usage import log_exceptions_and_usage class GcpProvider(PassthroughProvider): @@ -17,68 +7,3 @@ class GcpProvider(PassthroughProvider): """ pass - - -class GCSRegistryStore(RegistryStore): - def __init__(self, registry_config: RegistryConfig, repo_path: Path): - uri = registry_config.path - try: - import google.cloud.storage as storage - except ImportError as e: - from feast.errors import FeastExtrasDependencyImportError - - raise FeastExtrasDependencyImportError("gcp", str(e)) - - self.gcs_client = storage.Client() - self._uri = urlparse(uri) - self._bucket = self._uri.hostname - self._blob = self._uri.path.lstrip("/") - - @log_exceptions_and_usage(registry="gs") - def get_registry_proto(self): - import google.cloud.storage as storage - from google.cloud.exceptions import NotFound - - file_obj = TemporaryFile() - registry_proto = RegistryProto() - try: - bucket = self.gcs_client.get_bucket(self._bucket) - except NotFound: - raise Exception( - f"No bucket named {self._bucket} exists; please create it first." - ) - if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client): - self.gcs_client.download_blob_to_file( - self._uri.geturl(), file_obj, timeout=30 - ) - file_obj.seek(0) - registry_proto.ParseFromString(file_obj.read()) - return registry_proto - raise FileNotFoundError( - f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' - ) - - @log_exceptions_and_usage(registry="gs") - def update_registry_proto(self, registry_proto: RegistryProto): - self._write_registry(registry_proto) - - def teardown(self): - from google.cloud.exceptions import NotFound - - gs_bucket = self.gcs_client.get_bucket(self._bucket) - try: - gs_bucket.delete_blob(self._blob) - except NotFound: - # If the blob deletion fails with NotFound, it has already been deleted. - pass - - def _write_registry(self, registry_proto: RegistryProto): - registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) - # we have already checked the bucket exists so no need to do it again - gs_bucket = self.gcs_client.get_bucket(self._bucket) - blob = gs_bucket.blob(self._blob) - file_obj = TemporaryFile() - file_obj.write(registry_proto.SerializeToString()) - file_obj.seek(0) - blob.upload_from_file(file_obj) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 7249d247a2..1226ceaf37 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,14 +1,9 @@ -import uuid -from datetime import datetime -from pathlib import Path from typing import List from feast.infra.infra_object import Infra, InfraObject from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig, RepoConfig -from feast.usage import log_exceptions_and_usage +from feast.repo_config import RepoConfig class LocalProvider(PassthroughProvider): @@ -26,42 +21,3 @@ def plan_infra( ) infra.infra_objects += infra_objects return infra - - -class LocalRegistryStore(RegistryStore): - def __init__(self, registry_config: RegistryConfig, repo_path: Path): - registry_path = Path(registry_config.path) - if registry_path.is_absolute(): - self._filepath = registry_path - else: - self._filepath = repo_path.joinpath(registry_path) - - @log_exceptions_and_usage(registry="local") - def get_registry_proto(self): - registry_proto = RegistryProto() - if self._filepath.exists(): - registry_proto.ParseFromString(self._filepath.read_bytes()) - return registry_proto - raise FileNotFoundError( - f'Registry not found at path "{self._filepath}". Have you run "feast apply"?' - ) - - @log_exceptions_and_usage(registry="local") - def update_registry_proto(self, registry_proto: RegistryProto): - self._write_registry(registry_proto) - - def teardown(self): - try: - self._filepath.unlink() - except FileNotFoundError: - # If the file deletion fails with FileNotFoundError, the file has already - # been deleted. - pass - - def _write_registry(self, registry_proto: RegistryProto): - registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) - file_dir = self._filepath.parent - file_dir.mkdir(exist_ok=True) - with open(self._filepath, mode="wb", buffering=0) as f: - f.write(registry_proto.SerializeToString()) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 1890ffed5a..3b7c40ff99 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -11,7 +11,7 @@ from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.registry import BaseRegistry +from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView diff --git a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py index 69986ca6e1..b8bea46dee 100644 --- a/sdk/python/feast/infra/materialization/lambda/lambda_engine.py +++ b/sdk/python/feast/infra/materialization/lambda/lambda_engine.py @@ -22,7 +22,7 @@ ) from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.registry import BaseRegistry +from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.stream_feature_view import StreamFeatureView from feast.utils import _get_column_names diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 4f775981ef..88bd959be9 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -9,10 +9,10 @@ from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.stream_feature_view import StreamFeatureView -from ...registry import BaseRegistry from ...utils import ( _convert_arrow_to_proto, _get_column_names, diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 51780015e0..da19bff5ec 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -39,8 +39,8 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from ...saved_dataset import SavedDatasetStorage diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index daa99789aa..d7f11fb39f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -37,8 +37,9 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import Registry from feast.infra.utils import aws_utils -from feast.registry import BaseRegistry, Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 1339bdcac1..1347f8b37c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -34,6 +34,7 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.registry.registry import Registry from feast.infra.utils.postgres.connection_utils import ( _get_conn, df_to_postgres_table, @@ -41,7 +42,6 @@ ) from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import Registry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pg_type_code_to_arrow diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 21a00a6c5a..26d414232f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -28,7 +28,7 @@ RetrievalJob, RetrievalMetadata, ) -from feast.registry import Registry +from feast.infra.registry.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import spark_schema_to_np_dtypes diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 5a5afc9341..b5f0b1f950 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -25,8 +25,8 @@ RetrievalJob, RetrievalMetadata, ) +from feast.infra.registry.registry import Registry from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index f621070569..ca945c3ff3 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -29,7 +29,7 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, get_pyarrow_schema_from_batch_source, ) -from feast.registry import BaseRegistry +from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index b290fc94ca..741b97e2fd 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -25,8 +25,8 @@ from feast.dqm.errors import ValidationFailed from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView +from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 829d46c5ca..42b8f8497a 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -17,7 +17,7 @@ from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.offline_stores.offline_store import OfflineStore -from feast.registry import BaseRegistry +from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import RepoConfig from feast.type_map import feast_value_type_to_pa from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index f6de19fd74..2acf06017d 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -38,8 +38,8 @@ RedshiftLoggingDestination, SavedDatasetRedshiftStorage, ) +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils import aws_utils -from feast.registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 34bfb739d1..98db97b179 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -40,13 +40,13 @@ SnowflakeLoggingDestination, SnowflakeSource, ) +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.snowflake_utils import ( execute_snowflake_statement, get_snowflake_conn, write_pandas, write_parquet, ) -from feast.registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index d8a1641783..809afb6c16 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -19,9 +19,9 @@ from feast.infra.offline_stores.offline_utils import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import Provider +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import BaseRegistry from feast.repo_config import BATCH_ENGINE_CLASS_FOR_TYPE, RepoConfig from feast.saved_dataset import SavedDataset from feast.stream_feature_view import StreamFeatureView diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index c5f9380677..bf2a4ec7bb 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -13,10 +13,10 @@ from feast.importer import import_class from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset diff --git a/sdk/python/feast/infra/registry_stores/__init__.py b/sdk/python/feast/infra/registry/__init__.py similarity index 100% rename from sdk/python/feast/infra/registry_stores/__init__.py rename to sdk/python/feast/infra/registry/__init__.py diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py new file mode 100644 index 0000000000..5edfae3472 --- /dev/null +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -0,0 +1,647 @@ +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from abc import ABC, abstractmethod +from collections import defaultdict +from datetime import datetime +from typing import Any, Dict, List, Optional + +from google.protobuf.json_format import MessageToJson +from proto import Message + +from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.infra.infra_object import Infra +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.request_feature_view import RequestFeatureView +from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView + + +class BaseRegistry(ABC): + """ + The interface that Feast uses to apply, list, retrieve, and delete Feast objects (e.g. entities, + feature views, and data sources). + """ + + # Entity operations + @abstractmethod + def apply_entity(self, entity: Entity, project: str, commit: bool = True): + """ + Registers a single entity with Feast + + Args: + entity: Entity that will be registered + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_entity(self, name: str, project: str, commit: bool = True): + """ + Deletes an entity or raises an exception if not found. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + """ + Retrieves an entity. + + Args: + name: Name of entity + project: Feast project that this entity belongs to + allow_cache: Whether to allow returning this entity from a cached registry + + Returns: + Returns either the specified entity, or raises an exception if + none is found + """ + + @abstractmethod + def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + """ + Retrieve a list of entities from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of entities + """ + + # Data source operations + @abstractmethod + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + """ + Registers a single data source with Feast + + Args: + data_source: A data source that will be registered + project: Feast project that this data source belongs to + commit: Whether to immediately commit to the registry + """ + + @abstractmethod + def delete_data_source(self, name: str, project: str, commit: bool = True): + """ + Deletes a data source or raises an exception if not found. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + """ + Retrieves a data source. + + Args: + name: Name of data source + project: Feast project that this data source belongs to + allow_cache: Whether to allow returning this data source from a cached registry + + Returns: + Returns either the specified data source, or raises an exception if none is found + """ + + @abstractmethod + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + """ + Retrieve a list of data sources from the registry + + Args: + project: Filter data source based on project name + allow_cache: Whether to allow returning data sources from a cached registry + + Returns: + List of data sources + """ + + # Feature service operations + @abstractmethod + def apply_feature_service( + self, feature_service: FeatureService, project: str, commit: bool = True + ): + """ + Registers a single feature service with Feast + + Args: + feature_service: A feature service that will be registered + project: Feast project that this entity belongs to + """ + + @abstractmethod + def delete_feature_service(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature service or raises an exception if not found. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_feature_service( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureService: + """ + Retrieves a feature service. + + Args: + name: Name of feature service + project: Feast project that this feature service belongs to + allow_cache: Whether to allow returning this feature service from a cached registry + + Returns: + Returns either the specified feature service, or raises an exception if + none is found + """ + + @abstractmethod + def list_feature_services( + self, project: str, allow_cache: bool = False + ) -> List[FeatureService]: + """ + Retrieve a list of feature services from the registry + + Args: + allow_cache: Whether to allow returning entities from a cached registry + project: Filter entities based on project name + + Returns: + List of feature services + """ + + # Feature view operations + @abstractmethod + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): + """ + Registers a single feature view with Feast + + Args: + feature_view: Feature view that will be registered + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_feature_view(self, name: str, project: str, commit: bool = True): + """ + Deletes a feature view or raises an exception if not found. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + + # stream feature view operations + @abstractmethod + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ): + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_stream_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[StreamFeatureView]: + """ + Retrieve a list of stream feature views from the registry + + Args: + project: Filter stream feature views based on project name + allow_cache: Whether to allow returning stream feature views from a cached registry + + Returns: + List of stream feature views + """ + + # on demand feature view operations + @abstractmethod + def get_on_demand_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> OnDemandFeatureView: + """ + Retrieves an on demand feature view. + + Args: + name: Name of on demand feature view + project: Feast project that this on demand feature view belongs to + allow_cache: Whether to allow returning this on demand feature view from a cached registry + + Returns: + Returns either the specified on demand feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[OnDemandFeatureView]: + """ + Retrieve a list of on demand feature views from the registry + + Args: + project: Filter on demand feature views based on project name + allow_cache: Whether to allow returning on demand feature views from a cached registry + + Returns: + List of on demand feature views + """ + + # regular feature view operations + @abstractmethod + def get_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureView: + """ + Retrieves a feature view. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: + """ + Retrieve a list of feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of feature views + """ + + # request feature view operations + @abstractmethod + def get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: + """ + Retrieves a request feature view. + + Args: + name: Name of request feature view + project: Feast project that this feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + + @abstractmethod + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + """ + Retrieve a list of request feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of request feature views + """ + + @abstractmethod + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + """ + Updates materialization intervals tracked for a single feature view in Feast + + Args: + feature_view: Feature view that will be updated with an additional materialization interval tracked + project: Feast project that this feature view belongs to + start_date (datetime): Start date of the materialization interval to track + end_date (datetime): End date of the materialization interval to track + commit: Whether the change should be persisted immediately + """ + + # Saved dataset operations + @abstractmethod + def apply_saved_dataset( + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, + ): + """ + Stores a saved dataset metadata with Feast + + Args: + saved_dataset: SavedDataset that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_saved_dataset( + self, name: str, project: str, allow_cache: bool = False + ) -> SavedDataset: + """ + Retrieves a saved dataset. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified SavedDataset, or raises an exception if + none is found + """ + + def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): + """ + Delete a saved dataset. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified SavedDataset, or raises an exception if + none is found + """ + + @abstractmethod + def list_saved_datasets( + self, project: str, allow_cache: bool = False + ) -> List[SavedDataset]: + """ + Retrieves a list of all saved datasets in specified project + + Args: + project: Feast project + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns the list of SavedDatasets + """ + + # Validation reference operations + @abstractmethod + def apply_validation_reference( + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, + ): + """ + Persist a validation reference + + Args: + validation_reference: ValidationReference that will be added / updated to registry + project: Feast project that this dataset belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + """ + Deletes a validation reference or raises an exception if not found. + + Args: + name: Name of validation reference + project: Feast project that this object belongs to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_validation_reference( + self, name: str, project: str, allow_cache: bool = False + ) -> ValidationReference: + """ + Retrieves a validation reference. + + Args: + name: Name of dataset + project: Feast project that this dataset belongs to + allow_cache: Whether to allow returning this dataset from a cached registry + + Returns: + Returns either the specified ValidationReference, or raises an exception if + none is found + """ + + # TODO: Needs to be implemented. + def list_validation_references( + self, project: str, allow_cache: bool = False + ) -> List[ValidationReference]: + + """ + Retrieve a list of validation references from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of request feature views + """ + + def list_project_metadata( + self, project: str, allow_cache: bool = False + ) -> List[ProjectMetadata]: + """ + Retrieves project metadata + + Args: + project: Filter metadata based on project name + allow_cache: Allow returning feature views from the cached registry + + Returns: + List of project metadata + """ + + @abstractmethod + def update_infra(self, infra: Infra, project: str, commit: bool = True): + """ + Updates the stored Infra object. + + Args: + infra: The new Infra object to be stored. + project: Feast project that the Infra object refers to + commit: Whether the change should be persisted immediately + """ + + @abstractmethod + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: + """ + Retrieves the stored Infra object. + + Args: + project: Feast project that the Infra object refers to + allow_cache: Whether to allow returning this entity from a cached registry + + Returns: + The stored Infra object. + """ + + @abstractmethod + def apply_user_metadata( + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], + ): + ... + + @abstractmethod + def get_user_metadata( + self, project: str, feature_view: BaseFeatureView + ) -> Optional[bytes]: + ... + + @abstractmethod + def proto(self) -> RegistryProto: + """ + Retrieves a proto version of the registry. + + Returns: + The registry proto object. + """ + + @abstractmethod + def commit(self): + """Commits the state of the registry cache to the remote registry store.""" + + @abstractmethod + def refresh(self, project: Optional[str]): + """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" + + @staticmethod + def _message_to_sorted_dict(message: Message) -> Dict[str, Any]: + return json.loads(MessageToJson(message, sort_keys=True)) + + def to_dict(self, project: str) -> Dict[str, List[Any]]: + """Returns a dictionary representation of the registry contents for the specified project. + + For each list in the dictionary, the elements are sorted by name, so this + method can be used to compare two registries. + + Args: + project: Feast project to convert to a dict + """ + registry_dict: Dict[str, Any] = defaultdict(list) + registry_dict["project"] = project + for project_metadata in sorted(self.list_project_metadata(project=project)): + registry_dict["projectMetadata"].append( + self._message_to_sorted_dict(project_metadata.to_proto()) + ) + for data_source in sorted( + self.list_data_sources(project=project), key=lambda ds: ds.name + ): + registry_dict["dataSources"].append( + self._message_to_sorted_dict(data_source.to_proto()) + ) + for entity in sorted( + self.list_entities(project=project), key=lambda entity: entity.name + ): + registry_dict["entities"].append( + self._message_to_sorted_dict(entity.to_proto()) + ) + for feature_view in sorted( + self.list_feature_views(project=project), + key=lambda feature_view: feature_view.name, + ): + registry_dict["featureViews"].append( + self._message_to_sorted_dict(feature_view.to_proto()) + ) + for feature_service in sorted( + self.list_feature_services(project=project), + key=lambda feature_service: feature_service.name, + ): + registry_dict["featureServices"].append( + self._message_to_sorted_dict(feature_service.to_proto()) + ) + for on_demand_feature_view in sorted( + self.list_on_demand_feature_views(project=project), + key=lambda on_demand_feature_view: on_demand_feature_view.name, + ): + odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) + + odfv_dict["spec"]["userDefinedFunction"][ + "body" + ] = on_demand_feature_view.udf_string + registry_dict["onDemandFeatureViews"].append(odfv_dict) + for request_feature_view in sorted( + self.list_request_feature_views(project=project), + key=lambda request_feature_view: request_feature_view.name, + ): + registry_dict["requestFeatureViews"].append( + self._message_to_sorted_dict(request_feature_view.to_proto()) + ) + for saved_dataset in sorted( + self.list_saved_datasets(project=project), key=lambda item: item.name + ): + registry_dict["savedDatasets"].append( + self._message_to_sorted_dict(saved_dataset.to_proto()) + ) + for infra_object in sorted(self.get_infra(project=project).infra_objects): + registry_dict["infra"].append( + self._message_to_sorted_dict(infra_object.to_proto()) + ) + return registry_dict diff --git a/sdk/python/feast/infra/registry_stores/contrib/postgres/registry_store.py b/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py similarity index 98% rename from sdk/python/feast/infra/registry_stores/contrib/postgres/registry_store.py rename to sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py index b3c0c6bd36..362ec9f485 100644 --- a/sdk/python/feast/infra/registry_stores/contrib/postgres/registry_store.py +++ b/sdk/python/feast/infra/registry/contrib/postgres/postgres_registry_store.py @@ -3,10 +3,10 @@ import psycopg2 from psycopg2 import sql +from feast.infra.registry.registry_store import RegistryStore from feast.infra.utils.postgres.connection_utils import _get_conn from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry_store import RegistryStore from feast.repo_config import RegistryConfig diff --git a/sdk/python/feast/infra/registry/file.py b/sdk/python/feast/infra/registry/file.py new file mode 100644 index 0000000000..3ee75a7880 --- /dev/null +++ b/sdk/python/feast/infra/registry/file.py @@ -0,0 +1,47 @@ +import uuid +from datetime import datetime +from pathlib import Path + +from feast.infra.registry.registry_store import RegistryStore +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage + + +class FileRegistryStore(RegistryStore): + def __init__(self, registry_config: RegistryConfig, repo_path: Path): + registry_path = Path(registry_config.path) + if registry_path.is_absolute(): + self._filepath = registry_path + else: + self._filepath = repo_path.joinpath(registry_path) + + @log_exceptions_and_usage(registry="local") + def get_registry_proto(self): + registry_proto = RegistryProto() + if self._filepath.exists(): + registry_proto.ParseFromString(self._filepath.read_bytes()) + return registry_proto + raise FileNotFoundError( + f'Registry not found at path "{self._filepath}". Have you run "feast apply"?' + ) + + @log_exceptions_and_usage(registry="local") + def update_registry_proto(self, registry_proto: RegistryProto): + self._write_registry(registry_proto) + + def teardown(self): + try: + self._filepath.unlink() + except FileNotFoundError: + # If the file deletion fails with FileNotFoundError, the file has already + # been deleted. + pass + + def _write_registry(self, registry_proto: RegistryProto): + registry_proto.version_id = str(uuid.uuid4()) + registry_proto.last_updated.FromDatetime(datetime.utcnow()) + file_dir = self._filepath.parent + file_dir.mkdir(exist_ok=True) + with open(self._filepath, mode="wb", buffering=0) as f: + f.write(registry_proto.SerializeToString()) diff --git a/sdk/python/feast/infra/registry/gcs.py b/sdk/python/feast/infra/registry/gcs.py new file mode 100644 index 0000000000..6f922d4ea2 --- /dev/null +++ b/sdk/python/feast/infra/registry/gcs.py @@ -0,0 +1,75 @@ +import uuid +from datetime import datetime +from pathlib import Path +from tempfile import TemporaryFile +from urllib.parse import urlparse + +from feast.infra.registry.registry_store import RegistryStore +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage + + +class GCSRegistryStore(RegistryStore): + def __init__(self, registry_config: RegistryConfig, repo_path: Path): + uri = registry_config.path + try: + import google.cloud.storage as storage + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("gcp", str(e)) + + self.gcs_client = storage.Client() + self._uri = urlparse(uri) + self._bucket = self._uri.hostname + self._blob = self._uri.path.lstrip("/") + + @log_exceptions_and_usage(registry="gs") + def get_registry_proto(self): + import google.cloud.storage as storage + from google.cloud.exceptions import NotFound + + file_obj = TemporaryFile() + registry_proto = RegistryProto() + try: + bucket = self.gcs_client.get_bucket(self._bucket) + except NotFound: + raise Exception( + f"No bucket named {self._bucket} exists; please create it first." + ) + if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client): + self.gcs_client.download_blob_to_file( + self._uri.geturl(), file_obj, timeout=30 + ) + file_obj.seek(0) + registry_proto.ParseFromString(file_obj.read()) + return registry_proto + raise FileNotFoundError( + f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' + ) + + @log_exceptions_and_usage(registry="gs") + def update_registry_proto(self, registry_proto: RegistryProto): + self._write_registry(registry_proto) + + def teardown(self): + from google.cloud.exceptions import NotFound + + gs_bucket = self.gcs_client.get_bucket(self._bucket) + try: + gs_bucket.delete_blob(self._blob) + except NotFound: + # If the blob deletion fails with NotFound, it has already been deleted. + pass + + def _write_registry(self, registry_proto: RegistryProto): + registry_proto.version_id = str(uuid.uuid4()) + registry_proto.last_updated.FromDatetime(datetime.utcnow()) + # we have already checked the bucket exists so no need to do it again + gs_bucket = self.gcs_client.get_bucket(self._bucket) + blob = gs_bucket.blob(self._blob) + file_obj = TemporaryFile() + file_obj.write(registry_proto.SerializeToString()) + file_obj.seek(0) + blob.upload_from_file(file_obj) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/infra/registry/registry.py similarity index 55% rename from sdk/python/feast/registry.py rename to sdk/python/feast/infra/registry/registry.py index b6613f467e..221b44141a 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -11,12 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc -import json import logging import uuid -from abc import abstractmethod -from collections import defaultdict from datetime import datetime, timedelta from enum import Enum from pathlib import Path @@ -25,7 +21,6 @@ from urllib.parse import urlparse from google.protobuf.internal.containers import RepeatedCompositeFieldContainer -from google.protobuf.json_format import MessageToJson from proto import Message from feast import usage @@ -47,11 +42,12 @@ from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.infra_object import Infra +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry_store import NoopRegistryStore from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView @@ -61,17 +57,17 @@ REGISTRY_SCHEMA_VERSION = "1" REGISTRY_STORE_CLASS_FOR_TYPE = { - "GCSRegistryStore": "feast.infra.gcp.GCSRegistryStore", - "S3RegistryStore": "feast.infra.aws.S3RegistryStore", - "LocalRegistryStore": "feast.infra.local.LocalRegistryStore", - "PostgreSQLRegistryStore": "feast.infra.registry_stores.contrib.postgres.registry_store.PostgreSQLRegistryStore", + "GCSRegistryStore": "feast.infra.registry.gcs.GCSRegistryStore", + "S3RegistryStore": "feast.infra.registry.s3.S3RegistryStore", + "FileRegistryStore": "feast.infra.registry.file.FileRegistryStore", + "PostgreSQLRegistryStore": "feast.infra.registry.contrib.postgres.postgres_registry_store.PostgreSQLRegistryStore", } REGISTRY_STORE_CLASS_FOR_SCHEME = { "gs": "GCSRegistryStore", "s3": "S3RegistryStore", - "file": "LocalRegistryStore", - "": "LocalRegistryStore", + "file": "FileRegistryStore", + "": "FileRegistryStore", } @@ -148,614 +144,6 @@ def get_registry_store_class_from_scheme(registry_path: str): return get_registry_store_class_from_type(registry_store_type) -class BaseRegistry(abc.ABC): - # Entity operations - @abstractmethod - def apply_entity(self, entity: Entity, project: str, commit: bool = True): - """ - Registers a single entity with Feast - - Args: - entity: Entity that will be registered - project: Feast project that this entity belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def delete_entity(self, name: str, project: str, commit: bool = True): - """ - Deletes an entity or raises an exception if not found. - - Args: - name: Name of entity - project: Feast project that this entity belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: - """ - Retrieves an entity. - - Args: - name: Name of entity - project: Feast project that this entity belongs to - allow_cache: Whether to allow returning this entity from a cached registry - - Returns: - Returns either the specified entity, or raises an exception if - none is found - """ - - @abstractmethod - def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: - """ - Retrieve a list of entities from the registry - - Args: - allow_cache: Whether to allow returning entities from a cached registry - project: Filter entities based on project name - - Returns: - List of entities - """ - - # Data source operations - @abstractmethod - def apply_data_source( - self, data_source: DataSource, project: str, commit: bool = True - ): - """ - Registers a single data source with Feast - - Args: - data_source: A data source that will be registered - project: Feast project that this data source belongs to - commit: Whether to immediately commit to the registry - """ - - @abstractmethod - def delete_data_source(self, name: str, project: str, commit: bool = True): - """ - Deletes a data source or raises an exception if not found. - - Args: - name: Name of data source - project: Feast project that this data source belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_data_source( - self, name: str, project: str, allow_cache: bool = False - ) -> DataSource: - """ - Retrieves a data source. - - Args: - name: Name of data source - project: Feast project that this data source belongs to - allow_cache: Whether to allow returning this data source from a cached registry - - Returns: - Returns either the specified data source, or raises an exception if none is found - """ - - @abstractmethod - def list_data_sources( - self, project: str, allow_cache: bool = False - ) -> List[DataSource]: - """ - Retrieve a list of data sources from the registry - - Args: - project: Filter data source based on project name - allow_cache: Whether to allow returning data sources from a cached registry - - Returns: - List of data sources - """ - - # Feature service operations - @abstractmethod - def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True - ): - """ - Registers a single feature service with Feast - - Args: - feature_service: A feature service that will be registered - project: Feast project that this entity belongs to - """ - - @abstractmethod - def delete_feature_service(self, name: str, project: str, commit: bool = True): - """ - Deletes a feature service or raises an exception if not found. - - Args: - name: Name of feature service - project: Feast project that this feature service belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_feature_service( - self, name: str, project: str, allow_cache: bool = False - ) -> FeatureService: - """ - Retrieves a feature service. - - Args: - name: Name of feature service - project: Feast project that this feature service belongs to - allow_cache: Whether to allow returning this feature service from a cached registry - - Returns: - Returns either the specified feature service, or raises an exception if - none is found - """ - - @abstractmethod - def list_feature_services( - self, project: str, allow_cache: bool = False - ) -> List[FeatureService]: - """ - Retrieve a list of feature services from the registry - - Args: - allow_cache: Whether to allow returning entities from a cached registry - project: Filter entities based on project name - - Returns: - List of feature services - """ - - # Feature view operations - @abstractmethod - def apply_feature_view( - self, feature_view: BaseFeatureView, project: str, commit: bool = True - ): - """ - Registers a single feature view with Feast - - Args: - feature_view: Feature view that will be registered - project: Feast project that this feature view belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def delete_feature_view(self, name: str, project: str, commit: bool = True): - """ - Deletes a feature view or raises an exception if not found. - - Args: - name: Name of feature view - project: Feast project that this feature view belongs to - commit: Whether the change should be persisted immediately - """ - - # stream feature view operations - @abstractmethod - def get_stream_feature_view( - self, name: str, project: str, allow_cache: bool = False - ): - """ - Retrieves a stream feature view. - - Args: - name: Name of stream feature view - project: Feast project that this feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ - - @abstractmethod - def list_stream_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[StreamFeatureView]: - """ - Retrieve a list of stream feature views from the registry - - Args: - project: Filter stream feature views based on project name - allow_cache: Whether to allow returning stream feature views from a cached registry - - Returns: - List of stream feature views - """ - - # on demand feature view operations - @abstractmethod - def get_on_demand_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> OnDemandFeatureView: - """ - Retrieves an on demand feature view. - - Args: - name: Name of on demand feature view - project: Feast project that this on demand feature view belongs to - allow_cache: Whether to allow returning this on demand feature view from a cached registry - - Returns: - Returns either the specified on demand feature view, or raises an exception if - none is found - """ - - @abstractmethod - def list_on_demand_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[OnDemandFeatureView]: - """ - Retrieve a list of on demand feature views from the registry - - Args: - project: Filter on demand feature views based on project name - allow_cache: Whether to allow returning on demand feature views from a cached registry - - Returns: - List of on demand feature views - """ - - # regular feature view operations - @abstractmethod - def get_feature_view( - self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: - """ - Retrieves a feature view. - - Args: - name: Name of feature view - project: Feast project that this feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ - - @abstractmethod - def list_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[FeatureView]: - """ - Retrieve a list of feature views from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of feature views - """ - - # request feature view operations - @abstractmethod - def get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: - """ - Retrieves a request feature view. - - Args: - name: Name of request feature view - project: Feast project that this feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ - - @abstractmethod - def list_request_feature_views( - self, project: str, allow_cache: bool = False - ) -> List[RequestFeatureView]: - """ - Retrieve a list of request feature views from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of request feature views - """ - - @abstractmethod - def apply_materialization( - self, - feature_view: FeatureView, - project: str, - start_date: datetime, - end_date: datetime, - commit: bool = True, - ): - """ - Updates materialization intervals tracked for a single feature view in Feast - - Args: - feature_view: Feature view that will be updated with an additional materialization interval tracked - project: Feast project that this feature view belongs to - start_date (datetime): Start date of the materialization interval to track - end_date (datetime): End date of the materialization interval to track - commit: Whether the change should be persisted immediately - """ - - # Saved dataset operations - @abstractmethod - def apply_saved_dataset( - self, - saved_dataset: SavedDataset, - project: str, - commit: bool = True, - ): - """ - Stores a saved dataset metadata with Feast - - Args: - saved_dataset: SavedDataset that will be added / updated to registry - project: Feast project that this dataset belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_saved_dataset( - self, name: str, project: str, allow_cache: bool = False - ) -> SavedDataset: - """ - Retrieves a saved dataset. - - Args: - name: Name of dataset - project: Feast project that this dataset belongs to - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns either the specified SavedDataset, or raises an exception if - none is found - """ - - def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): - """ - Delete a saved dataset. - - Args: - name: Name of dataset - project: Feast project that this dataset belongs to - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns either the specified SavedDataset, or raises an exception if - none is found - """ - - @abstractmethod - def list_saved_datasets( - self, project: str, allow_cache: bool = False - ) -> List[SavedDataset]: - """ - Retrieves a list of all saved datasets in specified project - - Args: - project: Feast project - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns the list of SavedDatasets - """ - - # Validation reference operations - @abstractmethod - def apply_validation_reference( - self, - validation_reference: ValidationReference, - project: str, - commit: bool = True, - ): - """ - Persist a validation reference - - Args: - validation_reference: ValidationReference that will be added / updated to registry - project: Feast project that this dataset belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def delete_validation_reference(self, name: str, project: str, commit: bool = True): - """ - Deletes a validation reference or raises an exception if not found. - - Args: - name: Name of validation reference - project: Feast project that this object belongs to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_validation_reference( - self, name: str, project: str, allow_cache: bool = False - ) -> ValidationReference: - """ - Retrieves a validation reference. - - Args: - name: Name of dataset - project: Feast project that this dataset belongs to - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns either the specified ValidationReference, or raises an exception if - none is found - """ - - # TODO: Needs to be implemented. - def list_validation_references( - self, project: str, allow_cache: bool = False - ) -> List[ValidationReference]: - - """ - Retrieve a list of validation references from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of request feature views - """ - - def list_project_metadata( - self, project: str, allow_cache: bool = False - ) -> List[ProjectMetadata]: - """ - Retrieves project metadata - - Args: - project: Filter metadata based on project name - allow_cache: Allow returning feature views from the cached registry - - Returns: - List of project metadata - """ - - @abstractmethod - def update_infra(self, infra: Infra, project: str, commit: bool = True): - """ - Updates the stored Infra object. - - Args: - infra: The new Infra object to be stored. - project: Feast project that the Infra object refers to - commit: Whether the change should be persisted immediately - """ - - @abstractmethod - def get_infra(self, project: str, allow_cache: bool = False) -> Infra: - """ - Retrieves the stored Infra object. - - Args: - project: Feast project that the Infra object refers to - allow_cache: Whether to allow returning this entity from a cached registry - - Returns: - The stored Infra object. - """ - - @abstractmethod - def apply_user_metadata( - self, - project: str, - feature_view: BaseFeatureView, - metadata_bytes: Optional[bytes], - ): - ... - - @abstractmethod - def get_user_metadata( - self, project: str, feature_view: BaseFeatureView - ) -> Optional[bytes]: - ... - - @abstractmethod - def proto(self) -> RegistryProto: - """ - Retrieves a proto version of the registry. - - Returns: - The registry proto object. - """ - - @abstractmethod - def commit(self): - """Commits the state of the registry cache to the remote registry store.""" - - @abstractmethod - def refresh(self, project: Optional[str]): - """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" - - @staticmethod - def _message_to_sorted_dict(message: Message) -> Dict[str, Any]: - return json.loads(MessageToJson(message, sort_keys=True)) - - def to_dict(self, project: str) -> Dict[str, List[Any]]: - """Returns a dictionary representation of the registry contents for the specified project. - - For each list in the dictionary, the elements are sorted by name, so this - method can be used to compare two registries. - - Args: - project: Feast project to convert to a dict - """ - registry_dict: Dict[str, Any] = defaultdict(list) - registry_dict["project"] = project - for project_metadata in sorted(self.list_project_metadata(project=project)): - registry_dict["projectMetadata"].append( - self._message_to_sorted_dict(project_metadata.to_proto()) - ) - for data_source in sorted( - self.list_data_sources(project=project), key=lambda ds: ds.name - ): - registry_dict["dataSources"].append( - self._message_to_sorted_dict(data_source.to_proto()) - ) - for entity in sorted( - self.list_entities(project=project), key=lambda entity: entity.name - ): - registry_dict["entities"].append( - self._message_to_sorted_dict(entity.to_proto()) - ) - for feature_view in sorted( - self.list_feature_views(project=project), - key=lambda feature_view: feature_view.name, - ): - registry_dict["featureViews"].append( - self._message_to_sorted_dict(feature_view.to_proto()) - ) - for feature_service in sorted( - self.list_feature_services(project=project), - key=lambda feature_service: feature_service.name, - ): - registry_dict["featureServices"].append( - self._message_to_sorted_dict(feature_service.to_proto()) - ) - for on_demand_feature_view in sorted( - self.list_on_demand_feature_views(project=project), - key=lambda on_demand_feature_view: on_demand_feature_view.name, - ): - odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto()) - - odfv_dict["spec"]["userDefinedFunction"][ - "body" - ] = on_demand_feature_view.udf_string - registry_dict["onDemandFeatureViews"].append(odfv_dict) - for request_feature_view in sorted( - self.list_request_feature_views(project=project), - key=lambda request_feature_view: request_feature_view.name, - ): - registry_dict["requestFeatureViews"].append( - self._message_to_sorted_dict(request_feature_view.to_proto()) - ) - for saved_dataset in sorted( - self.list_saved_datasets(project=project), key=lambda item: item.name - ): - registry_dict["savedDatasets"].append( - self._message_to_sorted_dict(saved_dataset.to_proto()) - ) - for infra_object in sorted(self.get_infra(project=project).infra_objects): - registry_dict["infra"].append( - self._message_to_sorted_dict(infra_object.to_proto()) - ) - return registry_dict - - def _get_project_metadata( registry_proto: Optional[RegistryProto], project: str ) -> Optional[ProjectMetadataProto]: @@ -776,10 +164,6 @@ def _init_project_metadata(cached_registry_proto: RegistryProto, project: str): class Registry(BaseRegistry): - """ - Registry: A registry allows for the management and persistence of feature definitions and related metadata. - """ - def apply_user_metadata( self, project: str, @@ -806,7 +190,7 @@ def __new__( # We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers # needing to make any changes. if registry_config and registry_config.registry_type == "sql": - from feast.infra.registry_stores.sql import SqlRegistry + from feast.infra.registry.sql import SqlRegistry return SqlRegistry(registry_config, repo_path) else: @@ -864,14 +248,6 @@ def _initialize_registry(self, project: str): self._registry_store.update_registry_proto(registry_proto) def update_infra(self, infra: Infra, project: str, commit: bool = True): - """ - Updates the stored Infra object. - - Args: - infra: The new Infra object to be stored. - project: Feast project that the Infra object refers to - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -880,30 +256,12 @@ def update_infra(self, infra: Infra, project: str, commit: bool = True): self.commit() def get_infra(self, project: str, allow_cache: bool = False) -> Infra: - """ - Retrieves the stored Infra object. - - Args: - project: Feast project that the Infra object refers to - allow_cache: Whether to allow returning this entity from a cached registry - - Returns: - The stored Infra object. - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) return Infra.from_proto(registry_proto.infra) def apply_entity(self, entity: Entity, project: str, commit: bool = True): - """ - Registers a single entity with Feast - - Args: - entity: Entity that will be registered - project: Feast project that this entity belongs to - commit: Whether the change should be persisted immediately - """ entity.is_valid() now = datetime.utcnow() @@ -931,16 +289,6 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): self.commit() def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: - """ - Retrieve a list of entities from the registry - - Args: - allow_cache: Whether to allow returning entities from a cached registry - project: Filter entities based on project name - - Returns: - List of entities - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -953,16 +301,6 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] def list_data_sources( self, project: str, allow_cache: bool = False ) -> List[DataSource]: - """ - Retrieve a list of data sources from the registry - - Args: - project: Filter data source based on project name - allow_cache: Whether to allow returning data sources from a cached registry - - Returns: - List of data sources - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -975,14 +313,6 @@ def list_data_sources( def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True ): - """ - Registers a single data source with Feast - - Args: - data_source: A data source that will be registered - project: Feast project that this data source belongs to - commit: Whether to immediately commit to the registry - """ registry = self._prepare_registry_for_changes(project) for idx, existing_data_source_proto in enumerate(registry.data_sources): if existing_data_source_proto.name == data_source.name: @@ -1000,14 +330,6 @@ def apply_data_source( self.commit() def delete_data_source(self, name: str, project: str, commit: bool = True): - """ - Deletes a data source or raises an exception if not found. - - Args: - name: Name of data source - project: Feast project that this data source belongs to - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -1024,13 +346,6 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): - """ - Registers a single feature service with Feast - - Args: - feature_service: A feature service that will be registered - project: Feast project that this entity belongs to - """ now = datetime.utcnow() if not feature_service.created_timestamp: feature_service.created_timestamp = now @@ -1055,17 +370,6 @@ def apply_feature_service( def list_feature_services( self, project: str, allow_cache: bool = False ) -> List[FeatureService]: - """ - Retrieve a list of feature services from the registry - - Args: - allow_cache: Whether to allow returning entities from a cached registry - project: Filter entities based on project name - - Returns: - List of feature services - """ - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) feature_services = [] for feature_service_proto in registry.feature_services: @@ -1078,18 +382,6 @@ def list_feature_services( def get_feature_service( self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: - """ - Retrieves a feature service. - - Args: - name: Name of feature service - project: Feast project that this feature service belongs to - allow_cache: Whether to allow returning this feature service from a cached registry - - Returns: - Returns either the specified feature service, or raises an exception if - none is found - """ registry = self._get_registry_proto(project=project, allow_cache=allow_cache) for feature_service_proto in registry.feature_services: @@ -1101,18 +393,6 @@ def get_feature_service( raise FeatureServiceNotFoundException(name, project=project) def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: - """ - Retrieves an entity. - - Args: - name: Name of entity - project: Feast project that this entity belongs to - allow_cache: Whether to allow returning this entity from a cached registry - - Returns: - Returns either the specified entity, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1124,14 +404,6 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True ): - """ - Registers a single feature view with Feast - - Args: - feature_view: Feature view that will be registered - project: Feast project that this feature view belongs to - commit: Whether the change should be persisted immediately - """ feature_view.ensure_valid() now = datetime.utcnow() @@ -1188,16 +460,6 @@ def apply_feature_view( def list_stream_feature_views( self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: - """ - Retrieve a list of stream feature views from the registry - - Args: - project: Filter stream feature views based on project name - allow_cache: Whether to allow returning stream feature views from a cached registry - - Returns: - List of stream feature views - """ registry = self._get_registry_proto(project=project, allow_cache=allow_cache) stream_feature_views = [] for stream_feature_view in registry.stream_feature_views: @@ -1210,17 +472,6 @@ def list_stream_feature_views( def list_on_demand_feature_views( self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: - """ - Retrieve a list of on demand feature views from the registry - - Args: - project: Filter on demand feature views based on project name - allow_cache: Whether to allow returning on demand feature views from a cached registry - - Returns: - List of on demand feature views - """ - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) on_demand_feature_views = [] for on_demand_feature_view in registry.on_demand_feature_views: @@ -1233,18 +484,6 @@ def list_on_demand_feature_views( def get_on_demand_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: - """ - Retrieves an on demand feature view. - - Args: - name: Name of on demand feature view - project: Feast project that this on demand feature view belongs to - allow_cache: Whether to allow returning this on demand feature view from a cached registry - - Returns: - Returns either the specified on demand feature view, or raises an exception if - none is found - """ registry = self._get_registry_proto(project=project, allow_cache=allow_cache) for on_demand_feature_view in registry.on_demand_feature_views: @@ -1258,17 +497,6 @@ def get_on_demand_feature_view( def get_data_source( self, name: str, project: str, allow_cache: bool = False ) -> DataSource: - """ - Retrieves a data source. - - Args: - name: Name of data source - project: Feast project that this data source belongs to - allow_cache: Whether to allow returning this data source from a cached registry - - Returns: - Returns either the specified data source, or raises an exception if none is found - """ registry = self._get_registry_proto(project=project, allow_cache=allow_cache) for data_source in registry.data_sources: @@ -1284,16 +512,6 @@ def apply_materialization( end_date: datetime, commit: bool = True, ): - """ - Updates materialization intervals tracked for a single feature view in Feast - - Args: - feature_view: Feature view that will be updated with an additional materialization interval tracked - project: Feast project that this feature view belongs to - start_date (datetime): Start date of the materialization interval to track - end_date (datetime): End date of the materialization interval to track - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -1348,16 +566,6 @@ def apply_materialization( def list_feature_views( self, project: str, allow_cache: bool = False ) -> List[FeatureView]: - """ - Retrieve a list of feature views from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of feature views - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1368,17 +576,6 @@ def list_feature_views( return feature_views def get_request_feature_view(self, name: str, project: str): - """ - Retrieves a feature view. - - Args: - name: Name of feature view - project: Feast project that this feature view belongs to - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto(project=project, allow_cache=False) for feature_view_proto in registry_proto.feature_views: if ( @@ -1391,16 +588,6 @@ def get_request_feature_view(self, name: str, project: str): def list_request_feature_views( self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: - """ - Retrieve a list of request feature views from the registry - - Args: - allow_cache: Allow returning feature views from the cached registry - project: Filter feature views based on project name - - Returns: - List of feature views - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1415,18 +602,6 @@ def list_request_feature_views( def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: - """ - Retrieves a feature view. - - Args: - name: Name of feature view - project: Feast project that this feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1441,18 +616,6 @@ def get_feature_view( def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> StreamFeatureView: - """ - Retrieves a stream feature view. - - Args: - name: Name of stream feature view - project: Feast project that this stream feature view belongs to - allow_cache: Allow returning feature view from the cached registry - - Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1465,14 +628,6 @@ def get_stream_feature_view( raise FeatureViewNotFoundException(name, project) def delete_feature_service(self, name: str, project: str, commit: bool = True): - """ - Deletes a feature service or raises an exception if not found. - - Args: - name: Name of feature service - project: Feast project that this feature service belongs to - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -1490,14 +645,6 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): raise FeatureServiceNotFoundException(name, project) def delete_feature_view(self, name: str, project: str, commit: bool = True): - """ - Deletes a feature view or raises an exception if not found. - - Args: - name: Name of feature view - project: Feast project that this feature view belongs to - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -1552,14 +699,6 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): raise FeatureViewNotFoundException(name, project) def delete_entity(self, name: str, project: str, commit: bool = True): - """ - Deletes an entity or raises an exception if not found. - - Args: - name: Name of entity - project: Feast project that this entity belongs to - commit: Whether the change should be persisted immediately - """ self._prepare_registry_for_changes(project) assert self.cached_registry_proto @@ -1583,14 +722,6 @@ def apply_saved_dataset( project: str, commit: bool = True, ): - """ - Stores a saved dataset metadata with Feast - - Args: - saved_dataset: SavedDataset that will be added / updated to registry - project: Feast project that this dataset belongs to - commit: Whether the change should be persisted immediately - """ now = datetime.utcnow() if not saved_dataset.created_timestamp: saved_dataset.created_timestamp = now @@ -1618,18 +749,6 @@ def apply_saved_dataset( def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: - """ - Retrieves a saved dataset. - - Args: - name: Name of dataset - project: Feast project that this dataset belongs to - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns either the specified SavedDataset, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1644,16 +763,6 @@ def get_saved_dataset( def list_saved_datasets( self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: - """ - Retrieves a list of all saved datasets in specified project - - Args: - project: Feast project - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns the list of SavedDatasets - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1669,14 +778,6 @@ def apply_validation_reference( project: str, commit: bool = True, ): - """ - Persist a validation reference - - Args: - validation_reference: ValidationReference that will be added / updated to registry - project: Feast project that this dataset belongs to - commit: Whether the change should be persisted immediately - """ validation_reference_proto = validation_reference.to_proto() validation_reference_proto.project = project @@ -1698,18 +799,6 @@ def apply_validation_reference( def get_validation_reference( self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: - """ - Retrieves a validation reference. - - Args: - name: Name of dataset - project: Feast project that this dataset belongs to - allow_cache: Whether to allow returning this dataset from a cached registry - - Returns: - Returns either the specified ValidationReference, or raises an exception if - none is found - """ registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) @@ -1722,14 +811,6 @@ def get_validation_reference( raise ValidationReferenceNotFound(name, project=project) def delete_validation_reference(self, name: str, project: str, commit: bool = True): - """ - Deletes a validation reference or raises an exception if not found. - - Args: - name: Name of validation reference - project: Feast project that this object belongs to - commit: Whether the change should be persisted immediately - """ registry_proto = self._prepare_registry_for_changes(project) for idx, existing_validation_reference in enumerate( registry_proto.validation_references diff --git a/sdk/python/feast/registry_store.py b/sdk/python/feast/infra/registry/registry_store.py similarity index 100% rename from sdk/python/feast/registry_store.py rename to sdk/python/feast/infra/registry/registry_store.py diff --git a/sdk/python/feast/infra/registry/s3.py b/sdk/python/feast/infra/registry/s3.py new file mode 100644 index 0000000000..d3772910f5 --- /dev/null +++ b/sdk/python/feast/infra/registry/s3.py @@ -0,0 +1,80 @@ +import os +import uuid +from datetime import datetime +from pathlib import Path +from tempfile import TemporaryFile +from urllib.parse import urlparse + +from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist +from feast.infra.registry.registry_store import RegistryStore +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.repo_config import RegistryConfig +from feast.usage import log_exceptions_and_usage + +try: + import boto3 +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + + +class S3RegistryStore(RegistryStore): + def __init__(self, registry_config: RegistryConfig, repo_path: Path): + uri = registry_config.path + self._uri = urlparse(uri) + self._bucket = self._uri.hostname + self._key = self._uri.path.lstrip("/") + + self.s3_client = boto3.resource( + "s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL") + ) + + @log_exceptions_and_usage(registry="s3") + def get_registry_proto(self): + file_obj = TemporaryFile() + registry_proto = RegistryProto() + try: + from botocore.exceptions import ClientError + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + try: + bucket = self.s3_client.Bucket(self._bucket) + self.s3_client.meta.client.head_bucket(Bucket=bucket.name) + except ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the bucket does not exist. + error_code = int(e.response["Error"]["Code"]) + if error_code == 404: + raise S3RegistryBucketNotExist(self._bucket) + else: + raise S3RegistryBucketForbiddenAccess(self._bucket) from e + + try: + obj = bucket.Object(self._key) + obj.download_fileobj(file_obj) + file_obj.seek(0) + registry_proto.ParseFromString(file_obj.read()) + return registry_proto + except ClientError as e: + raise FileNotFoundError( + f"Error while trying to locate Registry at path {self._uri.geturl()}" + ) from e + + @log_exceptions_and_usage(registry="s3") + def update_registry_proto(self, registry_proto: RegistryProto): + self._write_registry(registry_proto) + + def teardown(self): + self.s3_client.Object(self._bucket, self._key).delete() + + def _write_registry(self, registry_proto: RegistryProto): + registry_proto.version_id = str(uuid.uuid4()) + registry_proto.last_updated.FromDatetime(datetime.utcnow()) + # we have already checked the bucket exists so no need to do it again + file_obj = TemporaryFile() + file_obj.write(registry_proto.SerializeToString()) + file_obj.seek(0) + self.s3_client.Bucket(self._bucket).put_object(Body=file_obj, Key=self._key) diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry/sql.py similarity index 99% rename from sdk/python/feast/infra/registry_stores/sql.py rename to sdk/python/feast/infra/registry/sql.py index 39ed2a802e..55412b33be 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -34,6 +34,7 @@ from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra +from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto @@ -57,7 +58,6 @@ from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) -from feast.registry import BaseRegistry from feast.repo_config import RegistryConfig from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference diff --git a/sdk/python/feast/infra/transformation_servers/app.py b/sdk/python/feast/infra/transformation_servers/app.py index acfb0959ba..7afba69beb 100644 --- a/sdk/python/feast/infra/transformation_servers/app.py +++ b/sdk/python/feast/infra/transformation_servers/app.py @@ -13,8 +13,8 @@ FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME, REGISTRY_ENV_NAME, ) -from feast.infra.local import LocalRegistryStore -from feast.registry import get_registry_store_class_from_scheme +from feast.infra.registry.file import FileRegistryStore +from feast.infra.registry.registry import get_registry_store_class_from_scheme # Load RepoConfig config_base64 = os.environ[FEATURE_STORE_YAML_ENV_NAME] @@ -32,7 +32,7 @@ registry = raw_config["registry"] registry_path = registry["path"] if isinstance(registry, dict) else registry registry_store_class = get_registry_store_class_from_scheme(registry_path) -if registry_store_class == LocalRegistryStore and not os.path.exists(registry_path): +if registry_store_class == FileRegistryStore and not os.path.exists(registry_path): registry_base64 = os.environ[REGISTRY_ENV_NAME] registry_bytes = base64.b64decode(registry_base64) registry_dir = os.path.dirname(registry_path) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c7d49f916c..007d87bf75 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -21,9 +21,9 @@ from feast.feature_store import FeatureStore from feast.feature_view import DUMMY_ENTITY, FeatureView from feast.file_utils import replace_str_in_file +from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry from feast.repo_config import RepoConfig from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 7866465b91..d27e2645d4 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -9,9 +9,9 @@ from feast import Entity, FeatureService, FeatureView, RepoConfig from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import BaseRegistry from feast.saved_dataset import SavedDataset diff --git a/sdk/python/tests/foo_registry_store.py b/sdk/python/tests/foo_registry_store.py index 31fb653e9b..a537ab344b 100644 --- a/sdk/python/tests/foo_registry_store.py +++ b/sdk/python/tests/foo_registry_store.py @@ -1,7 +1,7 @@ from pathlib import Path +from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.registry_store import RegistryStore from feast.repo_config import RegistryConfig diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index 73f87a01c7..4c8be46890 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -70,10 +70,10 @@ def test_usage_on(dummy_exporter, enabling_toggle): assert len(dummy_exporter) == 3 assert { - "entrypoint": "feast.infra.local.LocalRegistryStore.get_registry_proto" + "entrypoint": "feast.infra.registry.file.FileRegistryStore.get_registry_proto" }.items() <= dummy_exporter[0].items() assert { - "entrypoint": "feast.infra.local.LocalRegistryStore.update_registry_proto" + "entrypoint": "feast.infra.registry.file.FileRegistryStore.update_registry_proto" }.items() <= dummy_exporter[1].items() assert { "entrypoint": "feast.feature_store.FeatureStore.apply" @@ -140,7 +140,7 @@ def test_exception_usage_off(dummy_exporter, enabling_toggle): def _reload_feast(): """After changing environment need to reload modules and rerun usage decorators""" modules = ( - "feast.infra.local", + "feast.infra.registry.file", "feast.infra.online_stores.sqlite", "feast.feature_store", ) diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 78fdc866a3..739fb9ec5c 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -23,7 +23,7 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field -from feast.registry import Registry +from feast.infra.registry.registry import Registry from feast.repo_config import RegistryConfig from feast.types import Array, Bytes, Int64, String from tests.utils.e2e_test_validation import validate_registry_data_source_apply diff --git a/sdk/python/tests/unit/infra/test_local_registry.py b/sdk/python/tests/unit/infra/test_local_registry.py index 2f8a18dd44..1e3b2aec88 100644 --- a/sdk/python/tests/unit/infra/test_local_registry.py +++ b/sdk/python/tests/unit/infra/test_local_registry.py @@ -25,8 +25,8 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field +from feast.infra.registry.registry import Registry from feast.on_demand_feature_view import RequestSource, on_demand_feature_view -from feast.registry import Registry from feast.repo_config import RegistryConfig from feast.stream_feature_view import StreamFeatureView from feast.types import Array, Bytes, Float32, Int32, Int64, String diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 85746eb9c5..c1b45848ce 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -28,7 +28,7 @@ from feast.errors import FeatureViewNotFoundException from feast.feature_view import FeatureView from feast.field import Field -from feast.infra.registry_stores.sql import SqlRegistry +from feast.infra.registry.sql import SqlRegistry from feast.on_demand_feature_view import on_demand_feature_view from feast.repo_config import RegistryConfig from feast.types import Array, Bytes, Float32, Int32, Int64, String diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index ca6c1e4b39..c87f8fd61f 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -14,7 +14,7 @@ from feast.data_format import ParquetFormat from feast.entity import Entity from feast.field import Field -from feast.registry import Registry +from feast.infra.registry.registry import Registry from feast.types import Array, Bytes, Int64, String from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig,