diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index e96ab772a6..66821781e2 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -11,11 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import base64 import json import logging -import os -import tempfile from datetime import datetime from pathlib import Path from typing import List, Optional @@ -28,18 +25,15 @@ from pygments import formatters, highlight, lexers from feast import utils -from feast.constants import ( - DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, - FEATURE_STORE_YAML_ENV_NAME, -) +from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError -from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import load_repo_config from feast.repo_operations import ( apply_total, cli_check_repo, + create_feature_store, generate_project_name, init_repo, plan, @@ -172,10 +166,7 @@ def ui( """ Shows the Feast UI over the current directory """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) # Pass in the registry_dump method to get around a circular dependency store.serve_ui( host=host, @@ -192,10 +183,7 @@ def endpoint(ctx: click.Context): """ Display feature server endpoints """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) endpoint = store.get_feature_server_endpoint() if endpoint is not None: _logger.info( @@ -220,10 +208,7 @@ def data_source_describe(ctx: click.Context, name: str): """ Describe a data source """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) try: data_source = store.get_data_source(name) @@ -244,10 +229,7 @@ def data_source_list(ctx: click.Context): """ List all data sources """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) table = [] for datasource in store.list_data_sources(): table.append([datasource.name, datasource.__class__]) @@ -272,10 +254,7 @@ def entity_describe(ctx: click.Context, name: str): """ Describe an entity """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) try: entity = store.get_entity(name) @@ -296,10 +275,7 @@ def entity_list(ctx: click.Context): """ List all entities """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) table = [] for entity in store.list_entities(): table.append([entity.name, entity.description, entity.value_type]) @@ -324,10 +300,7 @@ def feature_service_describe(ctx: click.Context, name: str): """ Describe a feature service """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) try: feature_service = store.get_feature_service(name) @@ -350,10 +323,7 @@ def feature_service_list(ctx: click.Context): """ List all feature services """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) feature_services = [] for feature_service in store.list_feature_services(): feature_names = [] @@ -383,10 +353,7 @@ def feature_view_describe(ctx: click.Context, name: str): """ Describe a feature view """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) try: feature_view = store.get_feature_view(name) @@ -407,11 +374,7 @@ def feature_view_list(ctx: click.Context): """ List all feature views """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) table = [] for feature_view in [ *store.list_feature_views(), @@ -452,10 +415,7 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str): """ [Experimental] Describe an on demand feature view """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) try: on_demand_feature_view = store.get_on_demand_feature_view(name) @@ -478,10 +438,7 @@ def on_demand_feature_view_list(ctx: click.Context): """ [Experimental] List all on demand feature views """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) table = [] for on_demand_feature_view in store.list_on_demand_feature_views(): table.append([on_demand_feature_view.name]) @@ -583,10 +540,8 @@ def materialize_command( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) + store.materialize( feature_views=None if not views else views, start_date=utils.make_tzaware(parser.parse(start_ts)), @@ -612,10 +567,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) store.materialize_incremental( feature_views=None if not views else views, end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), @@ -707,22 +659,7 @@ def serve_command( no_feature_log: bool, ): """Start a feature server locally on a given port.""" - repo = ctx.obj["CHDIR"] - - # If we received a base64 encoded version of feature_store.yaml, use that - config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) - if config_base64: - print("Received base64 encoded feature_store.yaml") - config_bytes = base64.b64decode(config_base64) - # Create a new unique directory for writing feature_store.yaml - repo_path = Path(tempfile.mkdtemp()) - with open(repo_path / "feature_store.yaml", "wb") as f: - f.write(config_bytes) - store = FeatureStore(repo_path=str(repo_path.resolve())) - else: - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) store.serve(host, port, type_, no_access_log, no_feature_log) @@ -738,10 +675,7 @@ def serve_command( @click.pass_context def serve_transformations_command(ctx: click.Context, port: int): """[Experimental] Start a feature consumption server locally on a given port.""" - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) store.serve_transformations(port) @@ -778,10 +712,7 @@ def validate( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + store = create_feature_store(ctx) feature_service = store.get_feature_service(name=feature_service) reference = store.get_validation_reference(reference) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a66edc86cd..0cd425a46b 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -1,9 +1,11 @@ +import base64 import importlib import json import os import random import re import sys +import tempfile from importlib.abc import Loader from importlib.machinery import ModuleSpec from pathlib import Path @@ -14,6 +16,7 @@ from feast import PushSource from feast.batch_feature_view import BatchFeatureView +from feast.constants import FEATURE_STORE_YAML_ENV_NAME from feast.data_source import DataSource, KafkaSource, KinesisSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity @@ -328,6 +331,27 @@ def log_infra_changes( ) +@log_exceptions_and_usage +def create_feature_store( + ctx: click.Context, +) -> FeatureStore: + repo = ctx.obj["CHDIR"] + # If we received a base64 encoded version of feature_store.yaml, use that + config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) + if config_base64: + print("Received base64 encoded feature_store.yaml") + config_bytes = base64.b64decode(config_base64) + # Create a new unique directory for writing feature_store.yaml + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) + return FeatureStore(repo_path=str(repo_path.resolve())) + else: + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + + @log_exceptions_and_usage def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): os.chdir(repo_path)