diff --git a/Makefile b/Makefile index f7fa7cdbee..daf6e8bad0 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ install-python: python -m pip install -e sdk/python -U --use-deprecated=legacy-resolver test-python: - FEAST_USAGE=False pytest -n 8 sdk/python/tests + FEAST_USAGE=False IS_TEST=True pytest -n 8 sdk/python/tests test-python-integration: FEAST_USAGE=False IS_TEST=True pytest -n 8 --integration sdk/python/tests diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c7d961c2f0..960afc76f7 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -21,7 +21,7 @@ import pkg_resources import yaml -from feast import utils +from feast import flags, flags_helper, utils from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError from feast.feature_store import FeatureStore from feast.repo_config import load_repo_config @@ -423,5 +423,112 @@ def serve_command(ctx: click.Context, port: int): store.serve(port) +@cli.group(name="alpha") +def alpha_cmd(): + """ + Access alpha features + """ + pass + + +@alpha_cmd.command("list") +@click.pass_context +def list_alpha_features(ctx: click.Context): + """ + Lists all alpha features + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_path = str(repo) + store = FeatureStore(repo_path=repo_path) + + flags_to_show = flags.FLAG_NAMES.copy() + flags_to_show.remove(flags.FLAG_ALPHA_FEATURES_NAME) + print("Alpha features:") + for flag in flags_to_show: + enabled_string = ( + "enabled" + if flags_helper.feature_flag_enabled(store.config, flag) + else "disabled" + ) + print(f"{flag}: {enabled_string}") + + +@alpha_cmd.command("enable-all") +@click.pass_context +def enable_alpha_features(ctx: click.Context): + """ + Enables all alpha features + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_path = str(repo) + store = FeatureStore(repo_path=repo_path) + + if store.config.flags is None: + store.config.flags = {} + for flag_name in flags.FLAG_NAMES: + store.config.flags[flag_name] = True + store.config.write_to_path(Path(repo_path)) + + +@alpha_cmd.command("enable") +@click.argument("name", type=click.STRING) +@click.pass_context +def enable_alpha_feature(ctx: click.Context, name: str): + """ + Enables an alpha feature + """ + if name not in flags.FLAG_NAMES: + raise ValueError(f"Flag name, {name}, not valid.") + + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_path = str(repo) + store = FeatureStore(repo_path=repo_path) + + if store.config.flags is None: + store.config.flags = {} + store.config.flags[flags.FLAG_ALPHA_FEATURES_NAME] = True + store.config.flags[name] = True + store.config.write_to_path(Path(repo_path)) + + +@alpha_cmd.command("disable") +@click.argument("name", type=click.STRING) +@click.pass_context +def disable_alpha_feature(ctx: click.Context, name: str): + """ + Disables an alpha feature + """ + if name not in flags.FLAG_NAMES: + raise ValueError(f"Flag name, {name}, not valid.") + + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_path = str(repo) + store = FeatureStore(repo_path=repo_path) + + if store.config.flags is None or name not in store.config.flags: + return + store.config.flags[name] = False + store.config.write_to_path(Path(repo_path)) + + +@alpha_cmd.command("disable-all") +@click.pass_context +def disable_alpha_features(ctx: click.Context): + """ + Disables all alpha features + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_path = str(repo) + store = FeatureStore(repo_path=repo_path) + + store.config.flags = None + store.config.write_to_path(Path(repo_path)) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index cf85bdd08c..0d4fb929d9 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -258,3 +258,11 @@ def __init__(self, feature_view_name: str): super().__init__( f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view" ) + + +class ExperimentalFeatureNotEnabled(Exception): + def __init__(self, feature_flag_name: str): + super().__init__( + f"You are attempting to use an experimental feature that is not enabled. Please run " + f"`feast alpha enable {feature_flag_name}` " + ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 586ff2f0bc..dc4d4262b6 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -22,11 +22,12 @@ from colorama import Fore, Style from tqdm import tqdm -from feast import feature_server, utils +from feast import feature_server, flags, flags_helper, utils from feast.data_source import RequestDataSource from feast.entity import Entity from feast.errors import ( EntityNotFoundException, + ExperimentalFeatureNotEnabled, FeatureNameCollisionError, FeatureViewNotFoundException, RequestDataNotFoundInEntityDfException, @@ -380,6 +381,12 @@ def apply( views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] + if ( + not flags_helper.enable_on_demand_feature_views(self.config) + and len(odfvs_to_update) > 0 + ): + raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) + _validate_feature_views(views_to_update) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] @@ -986,6 +993,9 @@ def _augment_response_with_on_demand_transforms( @log_exceptions_and_usage def serve(self, port: int) -> None: """Start the feature consumption server locally on a given port.""" + if not flags_helper.enable_python_feature_server(self.config): + raise ExperimentalFeatureNotEnabled(flags.FLAG_PYTHON_FEATURE_SERVER_NAME) + feature_server.start_server(self, port) diff --git a/sdk/python/feast/flags.py b/sdk/python/feast/flags.py new file mode 100644 index 0000000000..67c50057cb --- /dev/null +++ b/sdk/python/feast/flags.py @@ -0,0 +1,10 @@ +FLAG_ALPHA_FEATURES_NAME = "alpha_features" +FLAG_ON_DEMAND_TRANSFORM_NAME = "on_demand_transforms" +FLAG_PYTHON_FEATURE_SERVER_NAME = "python_feature_server" +ENV_FLAG_IS_TEST = "IS_TEST" + +FLAG_NAMES = { + FLAG_ALPHA_FEATURES_NAME, + FLAG_ON_DEMAND_TRANSFORM_NAME, + FLAG_PYTHON_FEATURE_SERVER_NAME, +} diff --git a/sdk/python/feast/flags_helper.py b/sdk/python/feast/flags_helper.py new file mode 100644 index 0000000000..a8f3373e1e --- /dev/null +++ b/sdk/python/feast/flags_helper.py @@ -0,0 +1,39 @@ +import os + +from feast import flags +from feast.repo_config import RepoConfig + + +def _env_flag_enabled(name: str) -> bool: + return os.getenv(name, default="False") == "True" + + +def feature_flag_enabled(repo_config: RepoConfig, flag_name: str) -> bool: + if is_test(): + return True + return ( + _alpha_feature_flag_enabled(repo_config) + and repo_config.flags is not None + and flag_name in repo_config.flags + and repo_config.flags[flag_name] + ) + + +def _alpha_feature_flag_enabled(repo_config: RepoConfig) -> bool: + return ( + repo_config.flags is not None + and flags.FLAG_ALPHA_FEATURES_NAME in repo_config.flags + and repo_config.flags[flags.FLAG_ALPHA_FEATURES_NAME] + ) + + +def is_test() -> bool: + return _env_flag_enabled(flags.ENV_FLAG_IS_TEST) + + +def enable_on_demand_feature_views(repo_config: RepoConfig) -> bool: + return feature_flag_enabled(repo_config, flags.FLAG_ON_DEMAND_TRANSFORM_NAME) + + +def enable_python_feature_server(repo_config: RepoConfig) -> bool: + return feature_flag_enabled(repo_config, flags.FLAG_PYTHON_FEATURE_SERVER_NAME) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 0da4a329ec..c3c5096d96 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,4 +1,3 @@ -import os import uuid from datetime import date, datetime, timedelta from typing import Dict, List, Optional, Union @@ -10,6 +9,7 @@ from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed +from feast import flags_helper from feast.data_source import DataSource from feast.errors import ( BigQueryJobCancelled, @@ -270,8 +270,7 @@ def block_until_done( """ # For test environments, retry more aggressively - is_test = os.getenv("IS_TEST", default="False") == "True" - if is_test: + if flags_helper.is_test(): retry_cadence = 0.1 def _wait_until_done(bq_job): diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b2a2d913ea..35becb5641 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -13,6 +13,7 @@ from pydantic.error_wrappers import ErrorWrapper from pydantic.typing import Dict, Optional, Union +from feast import flags from feast.errors import ( FeastFeatureServerTypeInvalidError, FeastFeatureServerTypeSetError, @@ -98,6 +99,9 @@ class RepoConfig(FeastBaseModel): feature_server: Optional[Any] """ FeatureServerConfig: Feature server configuration (optional depending on provider) """ + flags: Any + """ Flags: Feature flags for experimental features (optional) """ + repo_path: Optional[Path] = None def __init__(self, **data: Any): @@ -255,6 +259,35 @@ def _validate_project_name(cls, v): ) return v + @validator("flags") + def _validate_flags(cls, v): + if not isinstance(v, Dict): + return + + for flag_name, val in v.items(): + if flag_name not in flags.FLAG_NAMES: + raise ValueError(f"Flag name, {flag_name}, not valid.") + if type(val) is not bool: + raise ValueError(f"Flag value, {val}, not valid.") + + return v + + def write_to_path(self, repo_path: Path): + config_path = repo_path / "feature_store.yaml" + with open(config_path, mode="w") as f: + yaml.dump( + yaml.safe_load( + self.json( + exclude={"repo_path"}, + exclude_none=True, + exclude_unset=True, + exclude_defaults=True, + ) + ), + f, + sort_keys=False, + ) + class FeastConfigError(Exception): def __init__(self, error_message, config_path):