diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index dc4d4262b6..8d8344d8eb 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -56,7 +56,7 @@ from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config from feast.type_map import python_value_to_proto_value -from feast.usage import log_exceptions, log_exceptions_and_usage +from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage from feast.value_type import ValueType from feast.version import get_version @@ -387,6 +387,9 @@ def apply( ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) + if len(odfvs_to_update) > 0: + log_event(UsageEvent.APPLY_WITH_ODFV) + _validate_feature_views(views_to_update) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] @@ -552,6 +555,8 @@ def get_historical_features( ) feature_views = list(view for view, _ in fvs) on_demand_feature_views = list(view for view, _ in odfvs) + if len(on_demand_feature_views) > 0: + log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV) # Check that the right request data is present in the entity_df if type(entity_df) == pd.DataFrame: @@ -810,6 +815,9 @@ def get_online_features( grouped_refs, grouped_odfv_refs = _group_feature_refs( _feature_refs, all_feature_views, all_on_demand_feature_views ) + if len(grouped_odfv_refs) > 0: + log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV) + feature_views = list(view for view, _ in grouped_refs) entityless_case = DUMMY_ENTITY_NAME in [ entity_name @@ -877,46 +885,16 @@ def get_online_features( feature_name ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT - # Note: each "table" is a feature view for table, requested_features in grouped_refs: - entity_keys = _get_table_entity_keys( - table, union_of_entity_keys, entity_name_to_join_key_map - ) - read_rows = provider.online_read( - config=self.config, - table=table, - entity_keys=entity_keys, - requested_features=requested_features, + self._populate_result_rows_from_feature_view( + entity_name_to_join_key_map, + full_feature_names, + provider, + requested_features, + result_rows, + table, + union_of_entity_keys, ) - # Each row is a set of features for a given entity key - for row_idx, read_row in enumerate(read_rows): - row_ts, feature_data = read_row - result_row = result_rows[row_idx] - - if feature_data is None: - for feature_name in requested_features: - feature_ref = ( - f"{table.name}__{feature_name}" - if full_feature_names - else feature_name - ) - result_row.statuses[ - feature_ref - ] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND - else: - for feature_name in feature_data: - feature_ref = ( - f"{table.name}__{feature_name}" - if full_feature_names - else feature_name - ) - if feature_name in requested_features: - result_row.fields[feature_ref].CopyFrom( - feature_data[feature_name] - ) - result_row.statuses[ - feature_ref - ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT initial_response = OnlineResponse( GetOnlineFeaturesResponse(field_values=result_rows) @@ -925,6 +903,55 @@ def get_online_features( _feature_refs, full_feature_names, initial_response, result_rows ) + def _populate_result_rows_from_feature_view( + self, + entity_name_to_join_key_map: Dict[str, str], + full_feature_names: bool, + provider: Provider, + requested_features: List[str], + result_rows: List[GetOnlineFeaturesResponse.FieldValues], + table: FeatureView, + union_of_entity_keys: List[EntityKeyProto], + ): + entity_keys = _get_table_entity_keys( + table, union_of_entity_keys, entity_name_to_join_key_map + ) + read_rows = provider.online_read( + config=self.config, + table=table, + entity_keys=entity_keys, + requested_features=requested_features, + ) + # Each row is a set of features for a given entity key + for row_idx, read_row in enumerate(read_rows): + row_ts, feature_data = read_row + result_row = result_rows[row_idx] + + if feature_data is None: + for feature_name in requested_features: + feature_ref = ( + f"{table.name}__{feature_name}" + if full_feature_names + else feature_name + ) + result_row.statuses[ + feature_ref + ] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND + else: + for feature_name in feature_data: + feature_ref = ( + f"{table.name}__{feature_name}" + if full_feature_names + else feature_name + ) + if feature_name in requested_features: + result_row.fields[feature_ref].CopyFrom( + feature_data[feature_name] + ) + result_row.statuses[ + feature_ref + ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT + def _get_needed_request_data_features(self, grouped_odfv_refs) -> Set[str]: needed_request_data_features = set() for odfv_to_feature_names in grouped_odfv_refs: diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index ba6e95f32d..b70b4f2c72 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures +import enum import logging import os import sys @@ -28,6 +30,23 @@ USAGE_ENDPOINT = "https://usage.feast.dev" _logger = logging.getLogger(__name__) +executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + + +@enum.unique +class UsageEvent(enum.Enum): + """ + An event meant to be logged + """ + + UNKNOWN = 0 + APPLY_WITH_ODFV = 1 + GET_HISTORICAL_FEATURES_WITH_ODFV = 2 + GET_ONLINE_FEATURES_WITH_ODFV = 3 + + def __str__(self): + return self.name.lower() + class Usage: def __init__(self): @@ -50,7 +69,7 @@ def check_env_and_configure(self): usage_filepath = join(feast_home_dir, "usage") self._is_test = os.getenv("FEAST_IS_USAGE_TEST", "False") == "True" - self._usage_counter = {"get_online_features": 0} + self._usage_counter = {} if os.path.exists(usage_filepath): with open(usage_filepath, "r") as f: @@ -73,14 +92,25 @@ def usage_id(self) -> Optional[str]: return os.getenv("FEAST_FORCE_USAGE_UUID") return self._usage_id - def log(self, function_name: str): + def _send_usage_request(self, json): + try: + future = executor.submit(requests.post, USAGE_ENDPOINT, json=json) + if self._is_test: + concurrent.futures.wait([future]) + except Exception as e: + if self._is_test: + raise e + else: + pass + + def log_function(self, function_name: str): self.check_env_and_configure() if self._usage_enabled and self.usage_id: - if function_name == "get_online_features": - self._usage_counter["get_online_features"] += 1 - if self._usage_counter["get_online_features"] % 10000 != 2: - return - self._usage_counter["get_online_features"] = 2 # avoid overflow + if ( + function_name == "get_online_features" + and not self.should_log_for_get_online_features_event(function_name) + ): + return json = { "function_name": function_name, "usage_id": self.usage_id, @@ -89,14 +119,35 @@ def log(self, function_name: str): "os": sys.platform, "is_test": self._is_test, } - try: - requests.post(USAGE_ENDPOINT, json=json) - except Exception as e: - if self._is_test: - raise e - else: - pass - return + self._send_usage_request(json) + + def should_log_for_get_online_features_event(self, event_name: str): + if event_name not in self._usage_counter: + self._usage_counter[event_name] = 0 + self._usage_counter[event_name] += 1 + if self._usage_counter[event_name] % 10000 != 2: + return False + self._usage_counter[event_name] = 2 # avoid overflow + return True + + def log_event(self, event: UsageEvent): + self.check_env_and_configure() + if self._usage_enabled and self.usage_id: + event_name = str(event) + if ( + event == UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV + and not self.should_log_for_get_online_features_event(event_name) + ): + return + json = { + "event_name": event_name, + "usage_id": self.usage_id, + "timestamp": datetime.utcnow().isoformat(), + "version": get_version(), + "os": sys.platform, + "is_test": self._is_test, + } + self._send_usage_request(json) def log_exception(self, error_type: str, traceback: List[Tuple[str, int, str]]): self.check_env_and_configure() @@ -149,7 +200,7 @@ def log_exceptions_and_usage(func): def exception_logging_wrapper(*args, **kwargs): try: result = func(*args, **kwargs) - usage.log(func.__name__) + usage.log_function(func.__name__) except Exception as e: error_type = type(e).__name__ trace_to_log = [] @@ -170,6 +221,10 @@ def exception_logging_wrapper(*args, **kwargs): return exception_logging_wrapper +def log_event(event: UsageEvent): + usage.log_event(event) + + def _trim_filename(filename: str) -> str: return filename.split("/")[-1]