Skip to content

Commit

Permalink
fix: Added generic Feature store Creation for CLI (feast-dev#3618)
Browse files Browse the repository at this point in the history
Added generic Feature store Creation

Signed-off-by: Felix Wang <[email protected]>
Co-authored-by: Felix Wang <[email protected]>
  • Loading branch information
nj7 and felixwang9817 authored Jun 5, 2023
1 parent 059509a commit bf740d2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 89 deletions.
109 changes: 20 additions & 89 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand All @@ -28,18 +25,15 @@
from pygments import formatters, highlight, lexers

from feast import utils
from feast.constants import (
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
FEATURE_STORE_YAML_ENV_NAME,
)
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import load_repo_config
from feast.repo_operations import (
apply_total,
cli_check_repo,
create_feature_store,
generate_project_name,
init_repo,
plan,
Expand Down Expand Up @@ -172,10 +166,7 @@ def ui(
"""
Shows the Feast UI over the current directory
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
# Pass in the registry_dump method to get around a circular dependency
store.serve_ui(
host=host,
Expand All @@ -192,10 +183,7 @@ def endpoint(ctx: click.Context):
"""
Display feature server endpoints
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
endpoint = store.get_feature_server_endpoint()
if endpoint is not None:
_logger.info(
Expand All @@ -220,10 +208,7 @@ def data_source_describe(ctx: click.Context, name: str):
"""
Describe a data source
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
data_source = store.get_data_source(name)
Expand All @@ -244,10 +229,7 @@ def data_source_list(ctx: click.Context):
"""
List all data sources
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for datasource in store.list_data_sources():
table.append([datasource.name, datasource.__class__])
Expand All @@ -272,10 +254,7 @@ def entity_describe(ctx: click.Context, name: str):
"""
Describe an entity
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
entity = store.get_entity(name)
Expand All @@ -296,10 +275,7 @@ def entity_list(ctx: click.Context):
"""
List all entities
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for entity in store.list_entities():
table.append([entity.name, entity.description, entity.value_type])
Expand All @@ -324,10 +300,7 @@ def feature_service_describe(ctx: click.Context, name: str):
"""
Describe a feature service
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
feature_service = store.get_feature_service(name)
Expand All @@ -350,10 +323,7 @@ def feature_service_list(ctx: click.Context):
"""
List all feature services
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
feature_services = []
for feature_service in store.list_feature_services():
feature_names = []
Expand Down Expand Up @@ -383,10 +353,7 @@ def feature_view_describe(ctx: click.Context, name: str):
"""
Describe a feature view
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
feature_view = store.get_feature_view(name)
Expand All @@ -407,11 +374,7 @@ def feature_view_list(ctx: click.Context):
"""
List all feature views
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]

cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for feature_view in [
*store.list_feature_views(),
Expand Down Expand Up @@ -452,10 +415,7 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
"""
[Experimental] Describe an on demand feature view
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
on_demand_feature_view = store.get_on_demand_feature_view(name)
Expand All @@ -478,10 +438,7 @@ def on_demand_feature_view_list(ctx: click.Context):
"""
[Experimental] List all on demand feature views
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for on_demand_feature_view in store.list_on_demand_feature_views():
table.append([on_demand_feature_view.name])
Expand Down Expand Up @@ -583,10 +540,8 @@ def materialize_command(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.materialize(
feature_views=None if not views else views,
start_date=utils.make_tzaware(parser.parse(start_ts)),
Expand All @@ -612,10 +567,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
store.materialize_incremental(
feature_views=None if not views else views,
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
Expand Down Expand Up @@ -707,22 +659,7 @@ def serve_command(
no_feature_log: bool,
):
"""Start a feature server locally on a given port."""
repo = ctx.obj["CHDIR"]

# If we received a base64 encoded version of feature_store.yaml, use that
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
if config_base64:
print("Received base64 encoded feature_store.yaml")
config_bytes = base64.b64decode(config_base64)
# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())
with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)
store = FeatureStore(repo_path=str(repo_path.resolve()))
else:
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.serve(host, port, type_, no_access_log, no_feature_log)

Expand All @@ -738,10 +675,7 @@ def serve_command(
@click.pass_context
def serve_transformations_command(ctx: click.Context, port: int):
"""[Experimental] Start a feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.serve_transformations(port)

Expand Down Expand Up @@ -778,10 +712,7 @@ def validate(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

feature_service = store.get_feature_service(name=feature_service)
reference = store.get_validation_reference(reference)
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
import importlib
import json
import os
import random
import re
import sys
import tempfile
from importlib.abc import Loader
from importlib.machinery import ModuleSpec
from pathlib import Path
Expand All @@ -14,6 +16,7 @@

from feast import PushSource
from feast.batch_feature_view import BatchFeatureView
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.data_source import DataSource, KafkaSource, KinesisSource
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
from feast.entity import Entity
Expand Down Expand Up @@ -328,6 +331,27 @@ def log_infra_changes(
)


@log_exceptions_and_usage
def create_feature_store(
ctx: click.Context,
) -> FeatureStore:
repo = ctx.obj["CHDIR"]
# If we received a base64 encoded version of feature_store.yaml, use that
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
if config_base64:
print("Received base64 encoded feature_store.yaml")
config_bytes = base64.b64decode(config_base64)
# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())
with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)
return FeatureStore(repo_path=str(repo_path.resolve()))
else:
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)


@log_exceptions_and_usage
def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool):
os.chdir(repo_path)
Expand Down

0 comments on commit bf740d2

Please sign in to comment.