Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include infra objects in registry dump and fix Infra's from_proto #2295

Merged
merged 7 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion sdk/python/feast/infra/infra_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class InfraObject(ABC):
Represents a single infrastructure object (e.g. online store table) managed by Feast.
"""

@abstractmethod
def __init__(self, name: str):
self._name = name

@property
def name(self) -> str:
return self._name

@abstractmethod
def to_infra_object_proto(self) -> InfraObjectProto:
"""Converts an InfraObject to its protobuf representation, wrapped in an InfraObjectProto."""
Expand All @@ -47,6 +55,9 @@ def to_proto(self) -> Any:
"""Converts an InfraObject to its protobuf representation."""
pass

def __lt__(self, other) -> bool:
return self.name < other.name

@staticmethod
@abstractmethod
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
Expand Down Expand Up @@ -136,7 +147,7 @@ def from_proto(cls, infra_proto: InfraProto):
Returns an Infra object created from a protobuf representation.
"""
infra = cls()
cls.infra_objects += [
infra.infra_objects += [
InfraObject.from_infra_object_proto(infra_object_proto)
for infra_object_proto in infra_proto.infra_objects
]
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ class DatastoreTable(InfraObject):
"""

project: str
name: str
project_id: Optional[str]
namespace: Optional[str]

Expand All @@ -347,8 +346,8 @@ def __init__(
project_id: Optional[str] = None,
namespace: Optional[str] = None,
):
super().__init__(name)
self.project = project
self.name = name
self.project_id = project_id
self.namespace = namespace

Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,10 @@ class DynamoDBTable(InfraObject):
region: The region of the table.
"""

name: str
region: str

def __init__(self, name: str, region: str):
self.name = name
super().__init__(name)
self.region = region

def to_infra_object_proto(self) -> InfraObjectProto:
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,11 @@ class SqliteTable(InfraObject):
"""

path: str
name: str
conn: sqlite3.Connection

def __init__(self, path: str, name: str):
super().__init__(name)
self.path = path
self.name = name
self.conn = _initialize_conn(path)

def to_infra_object_proto(self) -> InfraObjectProto:
Expand Down
27 changes: 20 additions & 7 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from collections import defaultdict
from datetime import datetime, timedelta
Expand All @@ -21,7 +22,7 @@
from urllib.parse import urlparse

from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
from proto import Message

from feast.base_feature_view import BaseFeatureView
Expand Down Expand Up @@ -797,41 +798,53 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
for entity in sorted(
self.list_entities(project=project), key=lambda entity: entity.name
):
registry_dict["entities"].append(MessageToDict(entity.to_proto()))
registry_dict["entities"].append(
self._message_to_sorted_dict(entity.to_proto())
)
for feature_view in sorted(
self.list_feature_views(project=project),
key=lambda feature_view: feature_view.name,
):
registry_dict["featureViews"].append(MessageToDict(feature_view.to_proto()))
registry_dict["featureViews"].append(
self._message_to_sorted_dict(feature_view.to_proto())
)
for feature_service in sorted(
self.list_feature_services(project=project),
key=lambda feature_service: feature_service.name,
):
registry_dict["featureServices"].append(
MessageToDict(feature_service.to_proto())
self._message_to_sorted_dict(feature_service.to_proto())
)
for on_demand_feature_view in sorted(
self.list_on_demand_feature_views(project=project),
key=lambda on_demand_feature_view: on_demand_feature_view.name,
):
registry_dict["onDemandFeatureViews"].append(
MessageToDict(on_demand_feature_view.to_proto())
self._message_to_sorted_dict(on_demand_feature_view.to_proto())
)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
key=lambda request_feature_view: request_feature_view.name,
):
registry_dict["requestFeatureViews"].append(
MessageToDict(request_feature_view.to_proto())
self._message_to_sorted_dict(request_feature_view.to_proto())
)
for saved_dataset in sorted(
self.list_saved_datasets(project=project), key=lambda item: item.name
):
registry_dict["savedDatasets"].append(
MessageToDict(saved_dataset.to_proto())
self._message_to_sorted_dict(saved_dataset.to_proto())
)
for infra_object in sorted(self.get_infra(project=project).infra_objects):
registry_dict["infra"].append(
self._message_to_sorted_dict(infra_object.to_proto())
)
return registry_dict

@staticmethod
def _message_to_sorted_dict(message: Message) -> Dict[str, Any]:
return json.loads(MessageToJson(message, sort_keys=True))

def _prepare_registry_for_changes(self):
"""Prepares the Registry for changes by refreshing the cache if necessary."""
try:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path):
"breaking changes in the future. No guarantees are made on this interface."
)
click.echo(f"{Style.BRIGHT}{Fore.YELLOW}{warning}{Style.RESET_ALL}")
click.echo(json.dumps(registry_dict, indent=2))
click.echo(json.dumps(registry_dict, indent=2, sort_keys=True))


def cli_check_repo(repo_path: Path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def test_write_to_online_store_event_check(local_redis_environment):


@pytest.mark.integration
@pytest.mark.universal
def test_write_to_online_store(environment, universal_data_sources):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
Expand Down
13 changes: 6 additions & 7 deletions sdk/python/tests/integration/registration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import FULL_REPO_CONFIGS
from tests.integration.feature_repos.repo_configuration import Environment
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand All @@ -32,16 +32,16 @@


@pytest.mark.integration
@pytest.mark.parametrize("test_repo_config", FULL_REPO_CONFIGS)
def test_universal_cli(test_repo_config) -> None:
@pytest.mark.universal
def test_universal_cli(environment: Environment):
project = f"test_universal_cli_{str(uuid.uuid4()).replace('-', '')[:8]}"
runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
try:
repo_path = Path(repo_dir_name)
feature_store_yaml = make_feature_store_yaml(
project, test_repo_config, repo_path
project, environment.test_repo_config, repo_path
)

repo_config = repo_path / "feature_store.yaml"
Expand All @@ -56,10 +56,9 @@ def test_universal_cli(test_repo_config) -> None:
# Store registry contents, to be compared later.
fs = FeatureStore(repo_path=str(repo_path))
registry_dict = fs.registry.to_dict(project=project)

# Save only the specs, not the metadata.
registry_specs = {
key: [fco["spec"] for fco in value]
key: [fco["spec"] if "spec" in fco else fco for fco in value]
adchia marked this conversation as resolved.
Show resolved Hide resolved
for key, value in registry_dict.items()
}

Expand Down Expand Up @@ -105,7 +104,7 @@ def test_universal_cli(test_repo_config) -> None:
registry_dict = fs.registry.to_dict(project=project)
assertpy.assert_that(registry_specs).is_equal_to(
{
key: [fco["spec"] for fco in value]
key: [fco["spec"] if "spec" in fco else fco for fco in value]
for key, value in registry_dict.items()
}
)
Expand Down