Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registry teardown #1718

Merged
merged 7 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,7 @@ def teardown(self):
entities = self.list_entities()

self._get_provider().teardown_infra(self.project, tables, entities)
for feature_view in feature_views:
self.delete_feature_view(feature_view.name)
for feature_table in feature_tables:
self._registry.delete_feature_table(feature_table.name, self.project)
self._registry.teardown()

@log_exceptions_and_usage
def get_historical_features(
Expand Down
40 changes: 32 additions & 8 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta):
f"Registry path {registry_path} has unsupported scheme {uri.scheme}. Supported schemes are file and gs."
)
self.cached_registry_proto_ttl = cache_ttl
return

def _initialize_registry(self):
"""Explicitly initializes the registry with an empty proto."""
Expand Down Expand Up @@ -109,7 +108,6 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True):
self.cached_registry_proto.entities.append(entity_proto)
if commit:
self.commit()
return

def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]:
"""
Expand Down Expand Up @@ -396,6 +394,10 @@ def refresh(self):
"""Refreshes the state of the registry cache by fetching the registry state from the remote registry store."""
self._get_registry_proto(allow_cache=False)

def teardown(self):
"""Tears down (removes) the registry."""
self._registry_store.teardown()

def _prepare_registry_for_changes(self):
"""Prepares the Registry for changes by refreshing the cache if necessary."""
try:
Expand Down Expand Up @@ -469,6 +471,13 @@ def update_registry_proto(self, registry_proto: RegistryProto):
"""
pass

@abstractmethod
def teardown(self):
"""
Tear down all resources.
"""
pass


class LocalRegistryStore(RegistryStore):
def __init__(self, repo_path: Path, registry_path_string: str):
Expand All @@ -489,15 +498,21 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
try:
self._filepath.unlink()
except FileNotFoundError:
# If the file deletion fails with FileNotFoundError, the file has already
# been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
self._filepath.write_bytes(registry_proto.SerializeToString())
return


class GCSRegistryStore(RegistryStore):
Expand All @@ -513,7 +528,6 @@ def __init__(self, uri: str):
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")
return

def get_registry_proto(self):
from google.cloud import storage
Expand All @@ -540,7 +554,16 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
from google.cloud.exceptions import NotFound

gs_bucket = self.gcs_client.get_bucket(self._bucket)
try:
gs_bucket.delete_blob(self._blob)
except NotFound:
# If the blob deletion fails with NotFound, it has already been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
Expand All @@ -552,7 +575,6 @@ def _write_registry(self, registry_proto: RegistryProto):
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
blob.upload_from_file(file_obj)
return


class S3RegistryStore(RegistryStore):
Expand Down Expand Up @@ -605,7 +627,9 @@ def get_registry_proto(self):

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
return

def teardown(self):
self.s3_client.Object(self._bucket, self._key).delete()

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
Expand Down
21 changes: 4 additions & 17 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from click.exceptions import BadParameter

from feast import Entity, FeatureTable
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
Expand Down Expand Up @@ -242,23 +243,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation

@log_exceptions_and_usage
def teardown(repo_config: RepoConfig, repo_path: Path):
registry_config = repo_config.get_registry_config()
registry = Registry(
registry_path=registry_config.path,
repo_path=repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
project = repo_config.project
registry_tables: List[Union[FeatureTable, FeatureView]] = []
registry_tables.extend(registry.list_feature_tables(project=project))
registry_tables.extend(registry.list_feature_views(project=project))

registry_entities: List[Entity] = registry.list_entities(project=project)

infra_provider = get_provider(repo_config, repo_path)
infra_provider.teardown_infra(
project, tables=registry_tables, entities=registry_entities
)
# Cannot pass in both repo_path and repo_config to FeatureStore.
feature_store = FeatureStore(repo_path=repo_path, config=None)
feature_store.teardown()


@log_exceptions_and_usage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def prep_redshift_fs_and_fv(

yield fs, fv

fs.teardown()

# Clean up the uploaded Redshift table
aws_utils.execute_redshift_statement(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def test_historical_features_from_parquet_sources(
).reset_index(drop=True),
)

store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -596,6 +598,8 @@ def test_historical_features_from_bigquery_sources(
actual_df_from_df_entities, table_from_df_entities.to_pandas()
)

store.teardown()


@pytest.mark.integration
def test_timestamp_bound_inference_from_entity_df_using_bigquery():
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/integration/registration/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def test_apply_entity_success(test_feature_store):
and entity.labels["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -154,6 +156,8 @@ def test_apply_entity_integration(test_feature_store):
and entity.labels["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.parametrize(
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
Expand Down Expand Up @@ -202,6 +206,8 @@ def test_apply_feature_view_success(test_feature_store):
and feature_views[0].entities[0] == "fs1_my_entity_1"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -266,6 +272,8 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source):
== actual_bq_using_query_arg_source
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -337,6 +345,8 @@ def test_apply_feature_view_integration(test_feature_store):
feature_views = test_feature_store.list_feature_views()
assert len(feature_views) == 0

test_feature_store.teardown()


@pytest.mark.parametrize(
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
Expand Down Expand Up @@ -398,6 +408,8 @@ def test_apply_object_and_read(test_feature_store):
assert fv2 != fv1_actual
assert e2 != e1_actual

test_feature_store.teardown()


def test_apply_remote_repo():
fd, registry_path = mkstemp()
Expand Down Expand Up @@ -466,3 +478,5 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source):
# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

test_feature_store.teardown()
30 changes: 30 additions & 0 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def test_apply_entity_success(test_registry):
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -135,6 +141,12 @@ def test_apply_entity_integration(test_registry):
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.parametrize(
"test_registry", [lazy_fixture("local_registry")],
Expand Down Expand Up @@ -203,6 +215,12 @@ def test_apply_feature_view_success(test_registry):
feature_views = test_registry.list_feature_views(project)
assert len(feature_views) == 0

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down Expand Up @@ -272,6 +290,12 @@ def test_apply_feature_view_integration(test_registry):
feature_views = test_registry.list_feature_views(project)
assert len(feature_views) == 0

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()


def test_commit():
fd, registry_path = mkstemp()
Expand Down Expand Up @@ -345,3 +369,9 @@ def test_commit():
and "team" in entity.labels
and entity.labels["team"] == "matchmaking"
)

test_registry.teardown()

# Will try to reload registry, which will fail because the file has been deleted
with pytest.raises(FileNotFoundError):
test_registry._get_registry_proto()