From ed51d9bac3c6829376a493a78fde1159755e909d Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 25 Jun 2020 11:58:20 +0800 Subject: [PATCH 01/18] Add native types for get_online_features --- sdk/python/feast/client.py | 45 ++++++++++-- sdk/python/feast/response.py | 54 ++++++++++++++ sdk/python/feast/type_map.py | 33 ++++++++- tests/e2e/redis/basic-ingest-redis-serving.py | 71 +++++++++++++++++++ 4 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 sdk/python/feast/response.py diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index f4c822e018..0898348bc2 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -22,7 +22,7 @@ import uuid from collections import OrderedDict from math import ceil -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import grpc import pandas as pd @@ -87,6 +87,9 @@ ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub from tensorflow_metadata.proto.v0 import statistics_pb2 +from feast.type_map import python_type_to_feast_value_type, _python_value_to_proto_value +from feast.response import OnlineResponse + _logger = logging.getLogger(__name__) @@ -655,10 +658,10 @@ def get_batch_features( def get_online_features( self, feature_refs: List[str], - entity_rows: List[GetOnlineFeaturesRequest.EntityRow], + entity_rows: List[Union[GetOnlineFeaturesRequest.EntityRow, Dict[str, Any]]], project: Optional[str] = None, omit_entities: bool = False, - ) -> GetOnlineFeaturesResponse: + ) -> OnlineResponse: """ Retrieves the latest online feature data from Feast Serving @@ -668,9 +671,13 @@ def get_online_features( "feature_set:feature" where "feature_set" & "feature" refer to the feature and feature set names respectively. Only the feature name is required. - entity_rows: List of GetFeaturesRequest.EntityRow where each row + entity_rows: + List of GetFeaturesRequest.EntityRow where each row contains entities. Timestamp should not be set for online retrieval. All entity types within a feature + OR + List of Dict[str, Union[bool,bytes,float,int,str,List[bool,bytes,float,int,str]]]] + where each key represents the entity name and value is feast.types.Value in Python native form. project: Specifies the project which contain the FeatureSets which the requested features belong to. omit_entities: If true will omit entity values in the returned feature data. @@ -682,6 +689,8 @@ def get_online_features( """ try: + if entity_rows and isinstance(entity_rows[0], dict): + entity_rows = _infer_entity_rows(entity_rows) response = self._serving_service.GetOnlineFeatures( GetOnlineFeaturesRequest( omit_entities_in_response=omit_entities, @@ -725,6 +734,8 @@ def get_online_features( except grpc.RpcError as e: raise grpc.RpcError(e.details()) + response = OnlineResponse(response) + return response def list_ingest_jobs( @@ -993,6 +1004,32 @@ def _get_grpc_metadata(self): return () +def _infer_entity_rows( + entities: List[Dict[str, Any]] +) -> List[GetOnlineFeaturesRequest.EntityRow]: + """ + Builds a list of EntityRow protos from Python native type format passed by user. + + Args: + entities: List of Dict[str, Union[bool,bytes,float,int,str,List[bool,bytes,float,int,str]]]] + where each key represents the entity name and value is feast.types.Value in Python native form. + + Returns: + A list of EntityRow protos parsed from args. + """ + entity_row_list = [] + + for entity in entities: + for key, value in entity.items(): + # Infer the specific type for this row + current_dtype = python_type_to_feast_value_type(name=key, value=value) + proto_value = _python_value_to_proto_value(current_dtype, value) + entity_row_list.append( + GetOnlineFeaturesRequest.EntityRow(fields={key: proto_value}) + ) + return entity_row_list + + def _build_feature_references( feature_ref_strs: List[str], project: Optional[str] = None ) -> List[FeatureReference]: diff --git a/sdk/python/feast/response.py b/sdk/python/feast/response.py new file mode 100644 index 0000000000..51ef90d97b --- /dev/null +++ b/sdk/python/feast/response.py @@ -0,0 +1,54 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +from feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse +from feast.type_map import feast_value_type_to_python_type + + +class OnlineResponse: + """ + Defines a online response in feast. + """ + + def __init__(self, online_response_proto: GetOnlineFeaturesResponse): + """ + Construct a native online response from its protobuf version. + + Args: + online_response_proto: GetOnlineResponse proto object to construct from. + """ + self.proto = online_response_proto + + @property + def field_values(self): + """ + Getter for GetOnlineResponse's field_values. + """ + return self.proto.field_values + + def to_dict(self) -> Dict[str, Any]: + """ + Converts GetOnlineFeaturesResponse features into a dictionary form. + """ + features = [k for row in self.field_values for k, _ in row.fields.items()] + features_dict = dict.fromkeys(features) + + for row in self.field_values: + for feature in features_dict.keys(): + native_type_value = feast_value_type_to_python_type(row.fields[feature]) + features_dict[feature] = native_type_value + + return features_dict diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 85def25fcb..d26968302c 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -13,12 +13,13 @@ # limitations under the License. from datetime import datetime, timezone -from typing import List +from typing import Any, List import numpy as np import pandas as pd import pyarrow as pa from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.json_format import MessageToDict from pyarrow.lib import TimestampType from feast.constants import DATETIME_COLUMN @@ -38,6 +39,36 @@ from feast.value_type import ValueType +def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: + """ + Converts field value Proto to Dict and returns each field's Feast Value Type value + in their respective Python value. + + Args: + field_value_proto: Field value Proto + + Returns: + Python native type based on Feast Value Type + """ + field_value_dict = MessageToDict(field_value_proto) + + for k, v in field_value_dict.items(): + if k == "int64Val": + return int(v) + if (k == "int64ListVal") or (k == "int32ListVal"): + return [int(item) for item in v["val"]] + if (k == "floatListVal") or (k == "doubleListVal"): + return [float(item) for item in v["val"]] + if k == "stringListVal": + return [str(item) for item in v["val"]] + if k == "bytesListVal": + return [bytes(item) for item in v["val"]] + if k == "boolListVal": + return [bool(item) for item in v["val"]] + + return v + + def python_type_to_feast_value_type( name: str, value, recurse: bool = True ) -> ValueType: diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 618cc45d9a..e084ca94e4 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -741,6 +741,77 @@ def test_list_entities_and_features(client): assert set(filter_by_project_entity_expected) == set(filter_by_project_entity_actual) assert set(filter_by_project_labels_expected) == set(filter_by_project_labels_actual) + +@pytest.fixture(scope='module') +def list_type_dataframe(): + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + customer_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "customer_id": [i for i in range(N_ROWS)], + "rating": [i for i in range(N_ROWS)], + "cost": [float(i)+0.5 for i in range(N_ROWS)], + "past_transactions_int": [[i,i+2] for i in range(N_ROWS)], + "past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], + "past_transactions_bool": [[True,False] for _ in range(N_ROWS)] + } + ) + return customer_df + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=43) +def test_basic_retrieve_online_dict(client, list_type_dataframe): + customer_fs = FeatureSet( + name="customer2", + features=[ + Feature(name="rating", dtype=ValueType.INT64, labels={"key1":"val1"}), + Feature(name="cost", dtype=ValueType.FLOAT), + Feature(name="past_transactions_int", dtype=ValueType.INT64_LIST), + Feature(name="past_transactions_double", dtype=ValueType.DOUBLE_LIST), + Feature(name="past_transactions_float", dtype=ValueType.FLOAT_LIST), + Feature(name="past_transactions_string", dtype=ValueType.STRING_LIST), + Feature(name="past_transactions_bool", dtype=ValueType.BOOL_LIST) + ], + entities=[Entity("customer_id", ValueType.INT64)], + max_age=Duration(seconds=600) + ) + + client.set_project(PROJECT_NAME) + client.apply(customer_fs) + + customer_fs = client.get_feature_set(name="customer2") + client.ingest(customer_fs, list_type_dataframe, timeout=300) + time.sleep(15) + + online_request_entity = [{"customer_id": 1}] + online_request_features = [ + "rating", + "cost", + "past_transactions_int", + "past_transactions_double", + "past_transactions_float", + "past_transactions_string", + "past_transactions_bool" + ] + + online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) + online_features_expected = { + "customer_id": 1, + "rating": 1, + "cost": 1.5, + "past_transactions_int": [1,3], + "past_transactions_double": [1.5,3.0], + "past_transactions_float": [1.5,3.0], + "past_transactions_string": ['first_1','second_1'], + "past_transactions_bool": [True,False] + } + assert online_features_actual.to_dict() == online_features_expected + + @pytest.mark.timeout(900) @pytest.mark.run(order=50) def test_sources_deduplicate_ingest_jobs(client): From 3475cc2ff4370b25d406cdbd38e199ae805722ef Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 25 Jun 2020 17:32:57 +0800 Subject: [PATCH 02/18] Address PR comments --- sdk/python/feast/client.py | 32 +++- sdk/python/feast/response.py | 4 +- sdk/python/feast/type_map.py | 13 +- tests/e2e/redis/basic-ingest-redis-serving.py | 147 ++++++++++++++---- 4 files changed, 159 insertions(+), 37 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 0898348bc2..4d69388a63 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -23,6 +23,7 @@ from collections import OrderedDict from math import ceil from typing import Any, Dict, List, Optional, Tuple, Union +import warnings import grpc import pandas as pd @@ -95,6 +96,8 @@ CPU_COUNT = os.cpu_count() # type: int +warnings.simplefilter("always", DeprecationWarning) + class Client: """ @@ -687,7 +690,10 @@ def get_online_features( Each EntityRow provided will yield one record, which contains data fields with data value and field status metadata (if included). """ - + warnings.warn( + "entity_rows parameter will only be accepting Dict format from v0.7 onwards", + DeprecationWarning, + ) try: if entity_rows and isinstance(entity_rows[0], dict): entity_rows = _infer_entity_rows(entity_rows) @@ -1004,6 +1010,12 @@ def _get_grpc_metadata(self): return () +def _is_mixed_type(entity_list: List[Any]) -> bool: + iseq = iter(entity_list) + first_type = type(next(iseq)) + return False if all((type(x) is first_type) for x in iseq) else True + + def _infer_entity_rows( entities: List[Dict[str, Any]] ) -> List[GetOnlineFeaturesRequest.EntityRow]: @@ -1018,11 +1030,29 @@ def _infer_entity_rows( A list of EntityRow protos parsed from args. """ entity_row_list = [] + temp_dtype_storage = dict() + is_mixed_type = False for entity in entities: for key, value in entity.items(): + if isinstance(value, list): + is_mixed_type = _is_mixed_type(value) # Infer the specific type for this row current_dtype = python_type_to_feast_value_type(name=key, value=value) + + if is_mixed_type: + raise TypeError( + f"Input entity {key} of List type has mixed types and that is not allowed. " + ) + if key not in temp_dtype_storage: + temp_dtype_storage[key] = current_dtype + else: + if current_dtype == temp_dtype_storage[key]: + pass + else: + raise TypeError( + f"Input entity {key} has mixed types and that is not allowed. " + ) proto_value = _python_value_to_proto_value(current_dtype, value) entity_row_list.append( GetOnlineFeaturesRequest.EntityRow(fields={key: proto_value}) diff --git a/sdk/python/feast/response.py b/sdk/python/feast/response.py index 51ef90d97b..7f62180123 100644 --- a/sdk/python/feast/response.py +++ b/sdk/python/feast/response.py @@ -44,11 +44,11 @@ def to_dict(self) -> Dict[str, Any]: Converts GetOnlineFeaturesResponse features into a dictionary form. """ features = [k for row in self.field_values for k, _ in row.fields.items()] - features_dict = dict.fromkeys(features) + features_dict = {k: list() for k in features} for row in self.field_values: for feature in features_dict.keys(): native_type_value = feast_value_type_to_python_type(row.fields[feature]) - features_dict[feature] = native_type_value + features_dict[feature].append(native_type_value) return features_dict diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index d26968302c..3d0ad206a8 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -55,6 +55,8 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: for k, v in field_value_dict.items(): if k == "int64Val": return int(v) + if k == "bytesVal": + return bytes(v) if (k == "int64ListVal") or (k == "int32ListVal"): return [int(item) for item in v["val"]] if (k == "floatListVal") or (k == "doubleListVal"): @@ -66,7 +68,13 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: if k == "boolListVal": return [bool(item) for item in v["val"]] - return v + if k in ["int32Val", "floatVal", "doubleVal", "stringVal", "boolVal"]: + return v + else: + raise TypeError( + f"Casting to Python native type for type {k} failed. " + f"Type {k} not found" + ) def python_type_to_feast_value_type( @@ -87,6 +95,9 @@ def python_type_to_feast_value_type( """ type_name = type(value).__name__ + if isinstance(value, list): + type_name = "ndarray" + value = np.asarray(value) type_map = { "int": ValueType.INT64, diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index e084ca94e4..967a27290d 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -750,13 +750,32 @@ def list_type_dataframe(): { "datetime": [time_offset] * N_ROWS, "customer_id": [i for i in range(N_ROWS)], - "rating": [i for i in range(N_ROWS)], - "cost": [float(i)+0.5 for i in range(N_ROWS)], - "past_transactions_int": [[i,i+2] for i in range(N_ROWS)], - "past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], - "past_transactions_bool": [[True,False] for _ in range(N_ROWS)] + "customer2_rating": [i for i in range(N_ROWS)], + "customer2_cost": [float(i)+0.5 for i in range(N_ROWS)], + "customer2_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], + "customer2_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "customer2_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "customer2_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], + "customer2_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] + } + ) + return customer_df + +@pytest.fixture(scope='module') +def entity_list_type_dataframe(): + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + customer_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "district_ids": [[np.int64(i),np.int64(i+1),np.int64(i+2)] for i in range(N_ROWS)], + "district_rating": [i for i in range(N_ROWS)], + "district_cost": [float(i)+0.5 for i in range(N_ROWS)], + "district_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], + "district_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "district_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "district_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], + "district_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] } ) return customer_df @@ -764,52 +783,114 @@ def list_type_dataframe(): @pytest.mark.timeout(600) @pytest.mark.run(order=43) -def test_basic_retrieve_online_dict(client, list_type_dataframe): +def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_type_dataframe): + # Case 1: Multiple entities check customer_fs = FeatureSet( name="customer2", features=[ - Feature(name="rating", dtype=ValueType.INT64, labels={"key1":"val1"}), - Feature(name="cost", dtype=ValueType.FLOAT), - Feature(name="past_transactions_int", dtype=ValueType.INT64_LIST), - Feature(name="past_transactions_double", dtype=ValueType.DOUBLE_LIST), - Feature(name="past_transactions_float", dtype=ValueType.FLOAT_LIST), - Feature(name="past_transactions_string", dtype=ValueType.STRING_LIST), - Feature(name="past_transactions_bool", dtype=ValueType.BOOL_LIST) + Feature(name="customer2_rating", dtype=ValueType.INT64, labels={"key1":"val1"}), + Feature(name="customer2_cost", dtype=ValueType.FLOAT), + Feature(name="customer2_past_transactions_int", dtype=ValueType.INT64_LIST), + Feature(name="customer2_past_transactions_double", dtype=ValueType.DOUBLE_LIST), + Feature(name="customer2_past_transactions_float", dtype=ValueType.FLOAT_LIST), + Feature(name="customer2_past_transactions_string", dtype=ValueType.STRING_LIST), + Feature(name="customer2_past_transactions_bool", dtype=ValueType.BOOL_LIST) ], entities=[Entity("customer_id", ValueType.INT64)], - max_age=Duration(seconds=600) + max_age=Duration(seconds=3600) ) client.set_project(PROJECT_NAME) client.apply(customer_fs) customer_fs = client.get_feature_set(name="customer2") - client.ingest(customer_fs, list_type_dataframe, timeout=300) + client.ingest(customer_fs, list_type_dataframe, timeout=600) time.sleep(15) - online_request_entity = [{"customer_id": 1}] + online_request_entity = [{"customer_id": 0},{"customer_id": 1}] online_request_features = [ - "rating", - "cost", - "past_transactions_int", - "past_transactions_double", - "past_transactions_float", - "past_transactions_string", - "past_transactions_bool" + "customer2_rating", + "customer2_cost", + "customer2_past_transactions_int", + "customer2_past_transactions_double", + "customer2_past_transactions_float", + "customer2_past_transactions_string", + "customer2_past_transactions_bool" ] online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) online_features_expected = { - "customer_id": 1, - "rating": 1, - "cost": 1.5, - "past_transactions_int": [1,3], - "past_transactions_double": [1.5,3.0], - "past_transactions_float": [1.5,3.0], - "past_transactions_string": ['first_1','second_1'], - "past_transactions_bool": [True,False] + "customer_id": [0,1], + "customer2_rating": [0,1], + "customer2_cost": [0.5,1.5], + "customer2_past_transactions_int": [[0,2],[1,3]], + "customer2_past_transactions_double": [[0.5,2.0],[1.5,3.0]], + "customer2_past_transactions_float": [[0.5,2.0],[1.5,3.0]], + "customer2_past_transactions_string": [['first_0','second_0'],['first_1','second_1']], + "customer2_past_transactions_bool": [[True,False],[True,False]] + } + + # Case 2: List entity check + district_fs = FeatureSet( + name="district", + features=[ + Feature(name="district_rating", dtype=ValueType.INT64, labels={"key1":"val1"}), + Feature(name="district_cost", dtype=ValueType.FLOAT), + Feature(name="district_past_transactions_int", dtype=ValueType.INT64_LIST), + Feature(name="district_past_transactions_double", dtype=ValueType.DOUBLE_LIST), + Feature(name="district_past_transactions_float", dtype=ValueType.FLOAT_LIST), + Feature(name="district_past_transactions_string", dtype=ValueType.STRING_LIST), + Feature(name="district_past_transactions_bool", dtype=ValueType.BOOL_LIST) + ], + entities=[Entity("district_ids", dtype=ValueType.INT64_LIST)], + max_age=Duration(seconds=3600) + ) + + client.set_project(PROJECT_NAME) + client.apply(district_fs) + + district_fs = client.get_feature_set(name="district") + client.ingest(district_fs, entity_list_type_dataframe, timeout=600) + time.sleep(15) + + online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}] + online_request_features2 = [ + "district_rating", + "district_cost", + "district_past_transactions_int", + "district_past_transactions_double", + "district_past_transactions_float", + "district_past_transactions_string", + "district_past_transactions_bool" + ] + + online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features2) + online_features_expected2 = { + "district_ids": [[np.int64(1),np.int64(2),np.int64(3)]], + "district_rating": [1], + "district_cost": [1.5], + "district_past_transactions_int": [[1,3]], + "district_past_transactions_double": [[1.5,3.0]], + "district_past_transactions_float": [[1.5,3.0]], + "district_past_transactions_string": [['first_1','second_1']], + "district_past_transactions_bool": [[True,False]] } + assert online_features_actual.to_dict() == online_features_expected + assert online_features_actual2.to_dict() == online_features_expected2 + + # Case 3: Entity check with mixed types + with pytest.raises(TypeError) as excinfo: + online_request_entity3 = [{"customer_id": 0},{"customer_id": "error_pls"}] + online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features) + + # Case 4: List entity check with mixed types + with pytest.raises(TypeError) as excinfo2: + online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}] + online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2) + + assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) + assert "Input entity district_ids of List type has mixed types and that is not allowed." in str(excinfo2.value) @pytest.mark.timeout(900) From 07eed46cd08bfca102a9182b89dd78f3fd1523d7 Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 25 Jun 2020 18:40:34 +0800 Subject: [PATCH 03/18] Address PR comments --- sdk/python/feast/client.py | 19 ++----------------- tests/e2e/redis/basic-ingest-redis-serving.py | 4 ++-- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 4d69388a63..44b1adac95 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -96,7 +96,7 @@ CPU_COUNT = os.cpu_count() # type: int -warnings.simplefilter("always", DeprecationWarning) +warnings.simplefilter("once", DeprecationWarning) class Client: @@ -1010,12 +1010,6 @@ def _get_grpc_metadata(self): return () -def _is_mixed_type(entity_list: List[Any]) -> bool: - iseq = iter(entity_list) - first_type = type(next(iseq)) - return False if all((type(x) is first_type) for x in iseq) else True - - def _infer_entity_rows( entities: List[Dict[str, Any]] ) -> List[GetOnlineFeaturesRequest.EntityRow]: @@ -1032,24 +1026,15 @@ def _infer_entity_rows( entity_row_list = [] temp_dtype_storage = dict() - is_mixed_type = False for entity in entities: for key, value in entity.items(): - if isinstance(value, list): - is_mixed_type = _is_mixed_type(value) # Infer the specific type for this row current_dtype = python_type_to_feast_value_type(name=key, value=value) - if is_mixed_type: - raise TypeError( - f"Input entity {key} of List type has mixed types and that is not allowed. " - ) if key not in temp_dtype_storage: temp_dtype_storage[key] = current_dtype else: - if current_dtype == temp_dtype_storage[key]: - pass - else: + if current_dtype != temp_dtype_storage[key]: raise TypeError( f"Input entity {key} has mixed types and that is not allowed. " ) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 967a27290d..89dd540e34 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -885,12 +885,12 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features) # Case 4: List entity check with mixed types - with pytest.raises(TypeError) as excinfo2: + with pytest.raises(ValueError) as excinfo2: online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}] online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2) assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) - assert "Input entity district_ids of List type has mixed types and that is not allowed." in str(excinfo2.value) + assert "Value \"True\" is of type not of type " in str(excinfo2.value) @pytest.mark.timeout(900) From 083aa0fcf7007e9912b11809bc0fe15dde45bc50 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 08:26:52 +0800 Subject: [PATCH 04/18] Fix lint --- sdk/python/feast/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 3b8d7fe3df..4040f3ea3a 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -84,7 +84,6 @@ GetFeastServingInfoRequest, GetFeastServingInfoResponse, GetOnlineFeaturesRequest, - GetOnlineFeaturesResponse, ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub from tensorflow_metadata.proto.v0 import statistics_pb2 From b461f4dea517de5e5ad7c9114e069081d9437810 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 10:47:29 +0800 Subject: [PATCH 05/18] Fix test order --- tests/e2e/redis/basic-ingest-redis-serving.py | 302 +++++++++--------- 1 file changed, 151 insertions(+), 151 deletions(-) diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index d0e747938f..3b03da4cf6 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -299,6 +299,157 @@ def try_get_features(): timeout_msg="Timed out trying to get online feature values") +@pytest.fixture(scope='module') +def list_type_dataframe(): + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + customer_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "customer_id2": [i for i in range(N_ROWS)], + "customer2_rating": [i for i in range(N_ROWS)], + "customer2_cost": [float(i)+0.5 for i in range(N_ROWS)], + "customer2_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], + "customer2_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "customer2_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "customer2_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], + "customer2_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] + } + ) + return customer_df + +@pytest.fixture(scope='module') +def entity_list_type_dataframe(): + N_ROWS = 2 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + customer_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "district_ids": [[np.int64(i),np.int64(i+1),np.int64(i+2)] for i in range(N_ROWS)], + "district_rating": [i for i in range(N_ROWS)], + "district_cost": [float(i)+0.5 for i in range(N_ROWS)], + "district_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], + "district_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "district_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], + "district_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], + "district_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] + } + ) + return customer_df + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=14) +def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_type_dataframe): + # Case 1: Multiple entities check + customer_fs = FeatureSet( + name="customer2", + features=[ + Feature(name="customer2_rating", dtype=ValueType.INT64), + Feature(name="customer2_cost", dtype=ValueType.FLOAT), + Feature(name="customer2_past_transactions_int", dtype=ValueType.INT64_LIST), + Feature(name="customer2_past_transactions_double", dtype=ValueType.DOUBLE_LIST), + Feature(name="customer2_past_transactions_float", dtype=ValueType.FLOAT_LIST), + Feature(name="customer2_past_transactions_string", dtype=ValueType.STRING_LIST), + Feature(name="customer2_past_transactions_bool", dtype=ValueType.BOOL_LIST) + ], + entities=[Entity("customer_id2", ValueType.INT64)], + max_age=Duration(seconds=3600) + ) + + client.set_project(PROJECT_NAME) + client.apply(customer_fs) + + customer_fs = client.get_feature_set(name="customer2") + client.ingest(customer_fs, list_type_dataframe, timeout=600) + time.sleep(15) + + online_request_entity = [{"customer_id2": 0},{"customer_id2": 1}] + online_request_features = [ + "customer2_rating", + "customer2_cost", + "customer2_past_transactions_int", + "customer2_past_transactions_double", + "customer2_past_transactions_float", + "customer2_past_transactions_string", + "customer2_past_transactions_bool" + ] + + online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) + online_features_expected = { + "customer_id2": [0,1], + "customer2_rating": [0,1], + "customer2_cost": [0.5,1.5], + "customer2_past_transactions_int": [[0,2],[1,3]], + "customer2_past_transactions_double": [[0.5,2.0],[1.5,3.0]], + "customer2_past_transactions_float": [[0.5,2.0],[1.5,3.0]], + "customer2_past_transactions_string": [['first_0','second_0'],['first_1','second_1']], + "customer2_past_transactions_bool": [[True,False],[True,False]] + } + + # Case 2: List entity check + district_fs = FeatureSet( + name="district", + features=[ + Feature(name="district_rating", dtype=ValueType.INT64), + Feature(name="district_cost", dtype=ValueType.FLOAT), + Feature(name="district_past_transactions_int", dtype=ValueType.INT64_LIST), + Feature(name="district_past_transactions_double", dtype=ValueType.DOUBLE_LIST), + Feature(name="district_past_transactions_float", dtype=ValueType.FLOAT_LIST), + Feature(name="district_past_transactions_string", dtype=ValueType.STRING_LIST), + Feature(name="district_past_transactions_bool", dtype=ValueType.BOOL_LIST) + ], + entities=[Entity("district_ids", dtype=ValueType.INT64_LIST)], + max_age=Duration(seconds=3600) + ) + + client.set_project(PROJECT_NAME) + client.apply(district_fs) + + district_fs = client.get_feature_set(name="district") + client.ingest(district_fs, entity_list_type_dataframe, timeout=600) + time.sleep(15) + + online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}] + online_request_features2 = [ + "district_rating", + "district_cost", + "district_past_transactions_int", + "district_past_transactions_double", + "district_past_transactions_float", + "district_past_transactions_string", + "district_past_transactions_bool" + ] + + online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features2) + online_features_expected2 = { + "district_ids": [[np.int64(1),np.int64(2),np.int64(3)]], + "district_rating": [1], + "district_cost": [1.5], + "district_past_transactions_int": [[1,3]], + "district_past_transactions_double": [[1.5,3.0]], + "district_past_transactions_float": [[1.5,3.0]], + "district_past_transactions_string": [['first_1','second_1']], + "district_past_transactions_bool": [[True,False]] + } + + assert online_features_actual.to_dict() == online_features_expected + assert online_features_actual2.to_dict() == online_features_expected2 + + # Case 3: Entity check with mixed types + with pytest.raises(TypeError) as excinfo: + online_request_entity3 = [{"customer_id": 0},{"customer_id": "error_pls"}] + online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features) + + # Case 4: List entity check with mixed types + with pytest.raises(ValueError) as excinfo2: + online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}] + online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2) + + assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) + assert "Value \"True\" is of type not of type " in str(excinfo2.value) + + @pytest.mark.timeout(300) @pytest.mark.run(order=19) def test_basic_ingest_jobs(client): @@ -742,157 +893,6 @@ def test_list_entities_and_features(client): assert set(filter_by_project_labels_expected) == set(filter_by_project_labels_actual) -@pytest.fixture(scope='module') -def list_type_dataframe(): - N_ROWS = 2 - time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) - customer_df = pd.DataFrame( - { - "datetime": [time_offset] * N_ROWS, - "customer_id": [i for i in range(N_ROWS)], - "customer2_rating": [i for i in range(N_ROWS)], - "customer2_cost": [float(i)+0.5 for i in range(N_ROWS)], - "customer2_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], - "customer2_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "customer2_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "customer2_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], - "customer2_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] - } - ) - return customer_df - -@pytest.fixture(scope='module') -def entity_list_type_dataframe(): - N_ROWS = 2 - time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) - customer_df = pd.DataFrame( - { - "datetime": [time_offset] * N_ROWS, - "district_ids": [[np.int64(i),np.int64(i+1),np.int64(i+2)] for i in range(N_ROWS)], - "district_rating": [i for i in range(N_ROWS)], - "district_cost": [float(i)+0.5 for i in range(N_ROWS)], - "district_past_transactions_int": [[i,i+2] for i in range(N_ROWS)], - "district_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "district_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)], - "district_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)], - "district_past_transactions_bool": [[True,False] for _ in range(N_ROWS)] - } - ) - return customer_df - - -@pytest.mark.timeout(600) -@pytest.mark.run(order=43) -def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_type_dataframe): - # Case 1: Multiple entities check - customer_fs = FeatureSet( - name="customer2", - features=[ - Feature(name="customer2_rating", dtype=ValueType.INT64, labels={"key1":"val1"}), - Feature(name="customer2_cost", dtype=ValueType.FLOAT), - Feature(name="customer2_past_transactions_int", dtype=ValueType.INT64_LIST), - Feature(name="customer2_past_transactions_double", dtype=ValueType.DOUBLE_LIST), - Feature(name="customer2_past_transactions_float", dtype=ValueType.FLOAT_LIST), - Feature(name="customer2_past_transactions_string", dtype=ValueType.STRING_LIST), - Feature(name="customer2_past_transactions_bool", dtype=ValueType.BOOL_LIST) - ], - entities=[Entity("customer_id", ValueType.INT64)], - max_age=Duration(seconds=3600) - ) - - client.set_project(PROJECT_NAME) - client.apply(customer_fs) - - customer_fs = client.get_feature_set(name="customer2") - client.ingest(customer_fs, list_type_dataframe, timeout=600) - time.sleep(15) - - online_request_entity = [{"customer_id": 0},{"customer_id": 1}] - online_request_features = [ - "customer2_rating", - "customer2_cost", - "customer2_past_transactions_int", - "customer2_past_transactions_double", - "customer2_past_transactions_float", - "customer2_past_transactions_string", - "customer2_past_transactions_bool" - ] - - online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) - online_features_expected = { - "customer_id": [0,1], - "customer2_rating": [0,1], - "customer2_cost": [0.5,1.5], - "customer2_past_transactions_int": [[0,2],[1,3]], - "customer2_past_transactions_double": [[0.5,2.0],[1.5,3.0]], - "customer2_past_transactions_float": [[0.5,2.0],[1.5,3.0]], - "customer2_past_transactions_string": [['first_0','second_0'],['first_1','second_1']], - "customer2_past_transactions_bool": [[True,False],[True,False]] - } - - # Case 2: List entity check - district_fs = FeatureSet( - name="district", - features=[ - Feature(name="district_rating", dtype=ValueType.INT64, labels={"key1":"val1"}), - Feature(name="district_cost", dtype=ValueType.FLOAT), - Feature(name="district_past_transactions_int", dtype=ValueType.INT64_LIST), - Feature(name="district_past_transactions_double", dtype=ValueType.DOUBLE_LIST), - Feature(name="district_past_transactions_float", dtype=ValueType.FLOAT_LIST), - Feature(name="district_past_transactions_string", dtype=ValueType.STRING_LIST), - Feature(name="district_past_transactions_bool", dtype=ValueType.BOOL_LIST) - ], - entities=[Entity("district_ids", dtype=ValueType.INT64_LIST)], - max_age=Duration(seconds=3600) - ) - - client.set_project(PROJECT_NAME) - client.apply(district_fs) - - district_fs = client.get_feature_set(name="district") - client.ingest(district_fs, entity_list_type_dataframe, timeout=600) - time.sleep(15) - - online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}] - online_request_features2 = [ - "district_rating", - "district_cost", - "district_past_transactions_int", - "district_past_transactions_double", - "district_past_transactions_float", - "district_past_transactions_string", - "district_past_transactions_bool" - ] - - online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features2) - online_features_expected2 = { - "district_ids": [[np.int64(1),np.int64(2),np.int64(3)]], - "district_rating": [1], - "district_cost": [1.5], - "district_past_transactions_int": [[1,3]], - "district_past_transactions_double": [[1.5,3.0]], - "district_past_transactions_float": [[1.5,3.0]], - "district_past_transactions_string": [['first_1','second_1']], - "district_past_transactions_bool": [[True,False]] - } - - assert online_features_actual.to_dict() == online_features_expected - assert online_features_actual2.to_dict() == online_features_expected2 - - # Case 3: Entity check with mixed types - with pytest.raises(TypeError) as excinfo: - online_request_entity3 = [{"customer_id": 0},{"customer_id": "error_pls"}] - online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features) - - # Case 4: List entity check with mixed types - with pytest.raises(ValueError) as excinfo2: - online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}] - online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2) - - assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) - assert "Value \"True\" is of type not of type " in str(excinfo2.value) - - @pytest.mark.timeout(900) @pytest.mark.run(order=60) def test_sources_deduplicate_ingest_jobs(client): From 441fec7863d66707532066ea028f7fb2a21b18a2 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 13:46:43 +0800 Subject: [PATCH 06/18] Address PR comments --- sdk/python/feast/client.py | 8 +- sdk/python/feast/response.py | 4 +- sdk/python/feast/type_map.py | 9 ++- tests/e2e/redis/basic-ingest-redis-serving.py | 75 +++++++++++++------ 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 4040f3ea3a..c2e691a905 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -990,17 +990,17 @@ def _infer_entity_rows( A list of EntityRow protos parsed from args. """ entity_row_list = [] - temp_dtype_storage = dict() + entity_type_map = dict() for entity in entities: for key, value in entity.items(): # Infer the specific type for this row current_dtype = python_type_to_feast_value_type(name=key, value=value) - if key not in temp_dtype_storage: - temp_dtype_storage[key] = current_dtype + if key not in entity_type_map: + entity_type_map[key] = current_dtype else: - if current_dtype != temp_dtype_storage[key]: + if current_dtype != entity_type_map[key]: raise TypeError( f"Input entity {key} has mixed types and that is not allowed. " ) diff --git a/sdk/python/feast/response.py b/sdk/python/feast/response.py index 7f62180123..a4f7c2d447 100644 --- a/sdk/python/feast/response.py +++ b/sdk/python/feast/response.py @@ -43,8 +43,8 @@ def to_dict(self) -> Dict[str, Any]: """ Converts GetOnlineFeaturesResponse features into a dictionary form. """ - features = [k for row in self.field_values for k, _ in row.fields.items()] - features_dict = {k: list() for k in features} + fields = [k for row in self.field_values for k, _ in row.fields.items()] + features_dict = {k: list() for k in fields} for row in self.field_values: for feature in features_dict.keys(): diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 3d0ad206a8..19ff41c058 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -48,7 +48,7 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: field_value_proto: Field value Proto Returns: - Python native type based on Feast Value Type + Python native type representation/version of the given field_value_proto """ field_value_dict = MessageToDict(field_value_proto) @@ -95,9 +95,6 @@ def python_type_to_feast_value_type( """ type_name = type(value).__name__ - if isinstance(value, list): - type_name = "ndarray" - value = np.asarray(value) type_map = { "int": ValueType.INT64, @@ -122,6 +119,10 @@ def python_type_to_feast_value_type( if type_name in type_map: return type_map[type_name] + if isinstance(value, list): + type_name = "ndarray" + value = np.asarray(value) + if type_name == "ndarray": if recurse: diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 3b03da4cf6..71fd32b778 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -300,7 +300,8 @@ def try_get_features(): @pytest.fixture(scope='module') -def list_type_dataframe(): +def nonlist_entity_dataframe(): + # Dataframe setup for feature retrieval with entity provided not in list format N_ROWS = 2 time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) customer_df = pd.DataFrame( @@ -319,7 +320,8 @@ def list_type_dataframe(): return customer_df @pytest.fixture(scope='module') -def entity_list_type_dataframe(): +def list_entity_dataframe(): + # Dataframe setup for feature retrieval with entity provided in list format N_ROWS = 2 time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) customer_df = pd.DataFrame( @@ -338,10 +340,11 @@ def entity_list_type_dataframe(): return customer_df +@pytest.mark.justtesting @pytest.mark.timeout(600) @pytest.mark.run(order=14) -def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_type_dataframe): - # Case 1: Multiple entities check +def test_basic_retrieve_online_entity_nonlistform(client, nonlist_entity_dataframe, list_entity_dataframe): + # Case 1: Feature retrieval with multiple entities retrieval check customer_fs = FeatureSet( name="customer2", features=[ @@ -361,7 +364,7 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ client.apply(customer_fs) customer_fs = client.get_feature_set(name="customer2") - client.ingest(customer_fs, list_type_dataframe, timeout=600) + client.ingest(customer_fs, nonlist_entity_dataframe, timeout=600) time.sleep(15) online_request_entity = [{"customer_id2": 0},{"customer_id2": 1}] @@ -375,7 +378,16 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ "customer2_past_transactions_bool" ] - online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) + def try_get_features(): + response = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) + return response, True + + online_features_actual = wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values" + ) + online_features_expected = { "customer_id2": [0,1], "customer2_rating": [0,1], @@ -387,7 +399,20 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ "customer2_past_transactions_bool": [[True,False],[True,False]] } - # Case 2: List entity check + assert online_features_actual.to_dict() == online_features_expected + + # Case 2: Feature retrieval with multiple entities retrieval check with mixed types + with pytest.raises(TypeError) as excinfo: + online_request_entity2 = [{"customer_id": 0},{"customer_id": "error_pls"}] + online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) + + assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) + + +@pytest.mark.timeout(600) +@pytest.mark.run(order=15) +def test_basic_retrieve_online_entity_listform(client, list_entity_dataframe): + # Case 1: Features retrieval with entity in list format check district_fs = FeatureSet( name="district", features=[ @@ -407,11 +432,11 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ client.apply(district_fs) district_fs = client.get_feature_set(name="district") - client.ingest(district_fs, entity_list_type_dataframe, timeout=600) + client.ingest(district_fs, list_entity_dataframe, timeout=600) time.sleep(15) - online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}] - online_request_features2 = [ + online_request_entity = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}] + online_request_features = [ "district_rating", "district_cost", "district_past_transactions_int", @@ -421,8 +446,17 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ "district_past_transactions_bool" ] - online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features2) - online_features_expected2 = { + def try_get_features(): + response = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) + return response, True + + online_features_actual = wait_retry_backoff( + retry_fn=try_get_features, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values" + ) + + online_features_expected = { "district_ids": [[np.int64(1),np.int64(2),np.int64(3)]], "district_rating": [1], "district_cost": [1.5], @@ -434,20 +468,13 @@ def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_typ } assert online_features_actual.to_dict() == online_features_expected - assert online_features_actual2.to_dict() == online_features_expected2 - # Case 3: Entity check with mixed types - with pytest.raises(TypeError) as excinfo: - online_request_entity3 = [{"customer_id": 0},{"customer_id": "error_pls"}] - online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features) - - # Case 4: List entity check with mixed types - with pytest.raises(ValueError) as excinfo2: - online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}] - online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2) + # Case 2: Features retrieval with entity in list format check with mixed types + with pytest.raises(ValueError) as excinfo: + online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),True]}] + online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) - assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) - assert "Value \"True\" is of type not of type " in str(excinfo2.value) + assert "Value \"True\" is of type not of type " in str(excinfo.value) @pytest.mark.timeout(300) From e6e56883aa9e248426c6e69b2b9babf2260a9585 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 18:44:08 +0800 Subject: [PATCH 07/18] Address PR comments --- sdk/python/feast/client.py | 25 ++++++---- sdk/python/feast/type_map.py | 49 +++++++++++++++---- tests/e2e/redis/basic-ingest-redis-serving.py | 3 +- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index c2e691a905..96071d6d2b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -89,6 +89,7 @@ from tensorflow_metadata.proto.v0 import statistics_pb2 from feast.type_map import python_type_to_feast_value_type, _python_value_to_proto_value from feast.response import OnlineResponse +from feast.types.Value_pb2 import Value as Value _logger = logging.getLogger(__name__) @@ -994,17 +995,21 @@ def _infer_entity_rows( for entity in entities: for key, value in entity.items(): - # Infer the specific type for this row - current_dtype = python_type_to_feast_value_type(name=key, value=value) - - if key not in entity_type_map: - entity_type_map[key] = current_dtype + # Allow for feast.types.Value + if isinstance(value, Value): + proto_value = value else: - if current_dtype != entity_type_map[key]: - raise TypeError( - f"Input entity {key} has mixed types and that is not allowed. " - ) - proto_value = _python_value_to_proto_value(current_dtype, value) + # Infer the specific type for this row + current_dtype = python_type_to_feast_value_type(name=key, value=value) + + if key not in entity_type_map: + entity_type_map[key] = current_dtype + else: + if current_dtype != entity_type_map[key]: + raise TypeError( + f"Input entity {key} has mixed types and that is not allowed. " + ) + proto_value = _python_value_to_proto_value(current_dtype, value) entity_row_list.append( GetOnlineFeaturesRequest.EntityRow(fields={key: proto_value}) ) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 19ff41c058..e5ec9f96bd 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -119,11 +119,7 @@ def python_type_to_feast_value_type( if type_name in type_map: return type_map[type_name] - if isinstance(value, list): - type_name = "ndarray" - value = np.asarray(value) - - if type_name == "ndarray": + if type_name == "ndarray" or isinstance(value, list): if recurse: # Convert to list type @@ -132,10 +128,15 @@ def python_type_to_feast_value_type( # This is the final type which we infer from the list common_item_value_type = None for item in list_items: - # Get the type from the current item, only one level deep - current_item_value_type = python_type_to_feast_value_type( - name=name, value=item, recurse=False - ) + if isinstance(item, ProtoValue): + current_item_value_type = _proto_str_to_value_type( + item.WhichOneof("val") + ) + else: + # Get the type from the current item, only one level deep + current_item_value_type = python_type_to_feast_value_type( + name=name, value=item, recurse=False + ) # Validate whether the type stays consistent if ( common_item_value_type @@ -396,6 +397,36 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: raise Exception(f"Unsupported data type: ${str(type(value))}") +def _proto_str_to_value_type(proto_str: str) -> ValueType: + """ + Returns Feast ValueType given Feast ValueType string. + + Args: + proto_str: str + + Returns: + A variant of ValueType. + """ + type_map = { + "int32_val": ValueType.INT32, + "int64_val": ValueType.INT64, + "double_val": ValueType.DOUBLE, + "float_val": ValueType.FLOAT, + "string_val": ValueType.STRING, + "bytes_val": ValueType.BYTES, + "bool_val": ValueType.BOOL, + "int32_list_val": ValueType.INT32_LIST, + "int64_list_val": ValueType.INT64_LIST, + "double_list_val": ValueType.DOUBLE_LIST, + "float_list_val": ValueType.FLOAT_LIST, + "string_list_val": ValueType.STRING_LIST, + "bytes_list_val": ValueType.BYTES_LIST, + "bool_list_val": ValueType.BOOL_LIST, + } + + return type_map[proto_str] + + def pa_to_feast_value_attr(pa_type: object): """ Returns the equivalent Feast ValueType string for the given pa.lib type. diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 71fd32b778..6a798ecc16 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -340,7 +340,6 @@ def list_entity_dataframe(): return customer_df -@pytest.mark.justtesting @pytest.mark.timeout(600) @pytest.mark.run(order=14) def test_basic_retrieve_online_entity_nonlistform(client, nonlist_entity_dataframe, list_entity_dataframe): @@ -474,7 +473,7 @@ def try_get_features(): online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),True]}] online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) - assert "Value \"True\" is of type not of type " in str(excinfo.value) + assert "List value type for field district_ids is inconsistent. ValueType.INT64 different from ValueType.BOOL." in str(excinfo.value) @pytest.mark.timeout(300) From e951715c9520bc832fa38a941c810fe09cc7b0fc Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 21:04:22 +0800 Subject: [PATCH 08/18] Address PR comments --- sdk/python/feast/client.py | 2 +- tests/e2e/redis/basic-ingest-redis-serving.py | 39 +++++++++++++++---- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 96071d6d2b..2410e89b73 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -1007,7 +1007,7 @@ def _infer_entity_rows( else: if current_dtype != entity_type_map[key]: raise TypeError( - f"Input entity {key} has mixed types and that is not allowed. " + f"Input entity {key} has mixed types, {current_dtype} and {entity_type_map[key]}. That is not allowed. " ) proto_value = _python_value_to_proto_value(current_dtype, value) entity_row_list.append( diff --git a/tests/e2e/redis/basic-ingest-redis-serving.py b/tests/e2e/redis/basic-ingest-redis-serving.py index 6a798ecc16..f4875fdd7b 100644 --- a/tests/e2e/redis/basic-ingest-redis-serving.py +++ b/tests/e2e/redis/basic-ingest-redis-serving.py @@ -43,6 +43,7 @@ GetOnlineFeaturesResponse) from feast.type_map import ValueType from feast.types.Value_pb2 import Value as Value +from feast.types.Value_pb2 import Int64List FLOAT_TOLERANCE = 0.00001 PROJECT_NAME = 'basic_' + uuid.uuid4().hex.upper()[0:6] @@ -376,13 +377,24 @@ def test_basic_retrieve_online_entity_nonlistform(client, nonlist_entity_datafra "customer2_past_transactions_string", "customer2_past_transactions_bool" ] + online_request_entity2 = [{"customer_id2": Value(int64_val=0)},{"customer_id2": Value(int64_val=1)}] - def try_get_features(): + def try_get_features1(): response = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) return response, True - online_features_actual = wait_retry_backoff( - retry_fn=try_get_features, + def try_get_features2(): + response = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) + return response, True + + online_features_actual1 = wait_retry_backoff( + retry_fn=try_get_features1, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values" + ) + + online_features_actual2 = wait_retry_backoff( + retry_fn=try_get_features2, timeout_secs=90, timeout_msg="Timed out trying to get online feature values" ) @@ -398,14 +410,15 @@ def try_get_features(): "customer2_past_transactions_bool": [[True,False],[True,False]] } - assert online_features_actual.to_dict() == online_features_expected + assert online_features_actual1.to_dict() == online_features_expected + assert online_features_actual2.to_dict() == online_features_expected # Case 2: Feature retrieval with multiple entities retrieval check with mixed types with pytest.raises(TypeError) as excinfo: online_request_entity2 = [{"customer_id": 0},{"customer_id": "error_pls"}] online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) - assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value) + assert "Input entity customer_id has mixed types, ValueType.STRING and ValueType.INT64. That is not allowed." in str(excinfo.value) @pytest.mark.timeout(600) @@ -444,13 +457,24 @@ def test_basic_retrieve_online_entity_listform(client, list_entity_dataframe): "district_past_transactions_string", "district_past_transactions_bool" ] + online_request_entity2 = [{"district_ids": Value(int64_list_val=Int64List(val=[1,2,3]))}] - def try_get_features(): + def try_get_features1(): response = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features) return response, True + def try_get_features2(): + response = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features) + return response, True + online_features_actual = wait_retry_backoff( - retry_fn=try_get_features, + retry_fn=try_get_features1, + timeout_secs=90, + timeout_msg="Timed out trying to get online feature values" + ) + + online_features_actual2 = wait_retry_backoff( + retry_fn=try_get_features2, timeout_secs=90, timeout_msg="Timed out trying to get online feature values" ) @@ -467,6 +491,7 @@ def try_get_features(): } assert online_features_actual.to_dict() == online_features_expected + assert online_features_actual2.to_dict() == online_features_expected # Case 2: Features retrieval with entity in list format check with mixed types with pytest.raises(ValueError) as excinfo: From c3edae37acfc95b062c1f62f87a8fef6d997d062 Mon Sep 17 00:00:00 2001 From: Terence Date: Fri, 26 Jun 2020 21:06:25 +0800 Subject: [PATCH 09/18] Update docs and example notebooks --- examples/basic/basic.ipynb | 203 ++++++----- ... Prediction (with Feast and XGBoost).ipynb | 339 +++++++++--------- 2 files changed, 294 insertions(+), 248 deletions(-) diff --git a/examples/basic/basic.ipynb b/examples/basic/basic.ipynb index 3d6bb3cc2a..fc8a72367e 100644 --- a/examples/basic/basic.ipynb +++ b/examples/basic/basic.ipynb @@ -148,48 +148,48 @@ "output_type": "stream", "text": [ " datetime customer_id daily_transactions \\\n", - "0 2020-06-09 00:00:00+00:00 1001 0.564751 \n", - "1 2020-06-09 00:00:00+00:00 1002 3.945566 \n", - "2 2020-06-09 00:00:00+00:00 1003 7.291928 \n", - "3 2020-06-09 00:00:00+00:00 1004 6.690477 \n", - "4 2020-06-09 00:00:00+00:00 1005 6.415899 \n", - "5 2020-06-10 00:00:00+00:00 1001 0.347294 \n", - "6 2020-06-10 00:00:00+00:00 1002 5.363853 \n", - "7 2020-06-10 00:00:00+00:00 1003 0.538129 \n", - "8 2020-06-10 00:00:00+00:00 1004 4.755425 \n", - "9 2020-06-10 00:00:00+00:00 1005 2.867527 \n", - "10 2020-06-11 00:00:00+00:00 1001 9.493098 \n", - "11 2020-06-11 00:00:00+00:00 1002 5.130665 \n", - "12 2020-06-11 00:00:00+00:00 1003 1.794191 \n", - "13 2020-06-11 00:00:00+00:00 1004 4.698504 \n", - "14 2020-06-11 00:00:00+00:00 1005 2.908603 \n", - "15 2020-06-12 00:00:00+00:00 1001 9.857894 \n", - "16 2020-06-12 00:00:00+00:00 1002 5.416553 \n", - "17 2020-06-12 00:00:00+00:00 1003 5.374058 \n", - "18 2020-06-12 00:00:00+00:00 1004 9.834441 \n", - "19 2020-06-12 00:00:00+00:00 1005 0.480373 \n", + "0 2020-06-17 00:00:00+00:00 1001 4.900417 \n", + "1 2020-06-17 00:00:00+00:00 1002 7.440329 \n", + "2 2020-06-17 00:00:00+00:00 1003 4.224760 \n", + "3 2020-06-17 00:00:00+00:00 1004 5.482722 \n", + "4 2020-06-17 00:00:00+00:00 1005 2.200896 \n", + "5 2020-06-18 00:00:00+00:00 1001 8.173628 \n", + "6 2020-06-18 00:00:00+00:00 1002 3.164327 \n", + "7 2020-06-18 00:00:00+00:00 1003 7.248387 \n", + "8 2020-06-18 00:00:00+00:00 1004 9.274397 \n", + "9 2020-06-18 00:00:00+00:00 1005 7.846449 \n", + "10 2020-06-19 00:00:00+00:00 1001 9.028874 \n", + "11 2020-06-19 00:00:00+00:00 1002 5.140390 \n", + "12 2020-06-19 00:00:00+00:00 1003 4.537877 \n", + "13 2020-06-19 00:00:00+00:00 1004 6.797491 \n", + "14 2020-06-19 00:00:00+00:00 1005 8.234574 \n", + "15 2020-06-20 00:00:00+00:00 1001 8.319164 \n", + "16 2020-06-20 00:00:00+00:00 1002 7.158817 \n", + "17 2020-06-20 00:00:00+00:00 1003 4.920308 \n", + "18 2020-06-20 00:00:00+00:00 1004 7.974404 \n", + "19 2020-06-20 00:00:00+00:00 1005 2.298012 \n", "\n", " total_transactions \n", - "0 73 \n", - "1 75 \n", - "2 95 \n", - "3 50 \n", - "4 65 \n", - "5 28 \n", - "6 76 \n", - "7 42 \n", + "0 45 \n", + "1 77 \n", + "2 8 \n", + "3 40 \n", + "4 53 \n", + "5 33 \n", + "6 93 \n", + "7 68 \n", "8 53 \n", - "9 61 \n", - "10 86 \n", - "11 31 \n", - "12 69 \n", - "13 9 \n", - "14 51 \n", - "15 23 \n", - "16 1 \n", - "17 34 \n", - "18 13 \n", - "19 50 \n" + "9 11 \n", + "10 19 \n", + "11 2 \n", + "12 1 \n", + "13 59 \n", + "14 95 \n", + "15 37 \n", + "16 93 \n", + "17 73 \n", + "18 46 \n", + "19 12 \n" ] } ], @@ -282,7 +282,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Feature set updated: \"customer_transactions\"\n" + "Feature set created: \"customer_transactions\"\n" ] } ], @@ -310,27 +310,26 @@ " ],\n", " \"features\": [\n", " {\n", - " \"name\": \"total_transactions\",\n", - " \"valueType\": \"INT64\"\n", - " },\n", - " {\n", " \"name\": \"daily_transactions\",\n", " \"valueType\": \"DOUBLE\"\n", + " },\n", + " {\n", + " \"name\": \"total_transactions\",\n", + " \"valueType\": \"INT64\"\n", " }\n", " ],\n", - " \"maxAge\": \"0s\",\n", " \"source\": {\n", " \"type\": \"KAFKA\",\n", " \"kafkaSourceConfig\": {\n", - " \"bootstrapServers\": \"kafka:9092,localhost:9094\",\n", + " \"bootstrapServers\": \"localhost:9094\",\n", " \"topic\": \"feast-features\"\n", " }\n", " },\n", " \"project\": \"default\"\n", " },\n", " \"meta\": {\n", - " \"createdTimestamp\": \"2020-06-18T12:04:08Z\",\n", - " \"status\": \"STATUS_READY\"\n", + " \"createdTimestamp\": \"2020-06-26T12:27:17Z\",\n", + " \"status\": \"STATUS_PENDING\"\n", " }\n", "}\n" ] @@ -357,17 +356,9 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 10, "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\r", - " 0%| | 0/50 [00:00customer_id\n", " 7032\n", " 7032\n", - " 0256-LTHVJ\n", + " 1934-SJVJK\n", " 1\n", " NaN\n", " NaN\n", @@ -1204,7 +1204,7 @@ ], "text/plain": [ " count unique top freq \\\n", - "customer_id 7032 7032 0256-LTHVJ 1 \n", + "customer_id 7032 7032 1934-SJVJK 1 \n", "gender 7032 NaN NaN NaN \n", "seniorcitizen 7032 NaN NaN NaN \n", "partner 7032 NaN NaN NaN \n", @@ -1562,7 +1562,7 @@ -0.007514979909200033, -0.01632782307070617, -0.013092839264555001, - -8.067457759124324E-4, + -0.0008067457759124324, -0.008507162405232782, -0.0071243969867245535, -0.010105418366566195, @@ -1579,16 +1579,16 @@ -0.003603167413572989, -0.015973079031173835, 0.001631872518613598, - 8.437084888753327E-4, + 0.0008437084888753327, 0.01319936726545174, -0.0010503798619876774, - -6.494991207074467E-4, + -0.0006494991207074467, -0.010516394125351592, -0.004318975744551275, 0.01627881894278637, 0.005285371870295646, -0.013779327268354416, - 4.783950839776602E-5 + 4.783950839776602e-05 ], [ -0.001819390613419179, @@ -1645,7 +1645,7 @@ 0.1425612874681736, -0.0010430787434336079, 0.0012346095228208073, - -2.855204740384597E-4, + -0.0002855204740384597, -0.2802019157901561, 0.08306706395255747, 0.24733370647615796, @@ -1795,7 +1795,7 @@ 0.11139068731904943, 0.0869415675760231, 0.09045518641091457, - -3.6426636786500763E-4, + -0.00036426636786500763, -0.17407470231312427, -0.26736609150996915, -0.08408097138202993, @@ -1807,7 +1807,7 @@ 0.510100290145439 ], [ - -8.067457759124324E-4, + -0.0008067457759124324, 0.059513871482029225, 0.1535564364182745, 0.013899668260943368, @@ -2159,8 +2159,8 @@ 0.04275388869901973, -0.0014704079413737543, -0.013428683581908657, - -4.678565419111244E-4, - -7.9001062411148E-4, + -0.0004678565419111244, + -0.00079001062411148, 0.014665913237832425, 0.013786269825793156, -0.16136793538251534, @@ -2205,7 +2205,7 @@ [ 0.004744965758849955, -0.18251949495535458, - -2.855204740384597E-4, + -0.0002855204740384597, 0.13838288994798562, 0.1718171065699321, -0.33279949932167546, @@ -2300,7 +2300,7 @@ 0.05762874929825631, 0.06758968145792751, -0.10954646682258033, - 1.971241890499786E-4, + 0.0001971241890499786, -0.2512993032332009, -0.017196458507178478, 0.15389327309228798, @@ -2419,13 +2419,13 @@ 0.1826633671546002 ], [ - 8.437084888753327E-4, + 0.0008437084888753327, 0.17132216591713703, -0.08320661736633733, -0.1492739811934336, 0.0027471183312986857, -0.11229466175861408, - -3.6426636786500763E-4, + -0.00036426636786500763, -0.003308493511411254, -0.11480726996437085, 0.1447470086556032, @@ -2475,7 +2475,7 @@ -0.3059839224841771, 0.3196937439459745, 0.006208692442055045, - 1.971241890499786E-4, + 0.0001971241890499786, -0.007422540118618717, -0.28809669563791657, -0.28558254792863036, @@ -2527,7 +2527,7 @@ -0.5924430690900127 ], [ - -6.494991207074467E-4, + -0.0006494991207074467, 0.0018604411671567017, -0.048481275955609554, -0.0014594010172321779, @@ -2579,7 +2579,7 @@ -0.03215707580783382, 0.014777815951004133, 0.023690157563696343, - -4.678565419111244E-4, + -0.0004678565419111244, 0.005613761063888361, -0.006230482005525746, -0.05215631834144274, @@ -2615,7 +2615,7 @@ -0.08063023581160907, 0.009750281447949528, 0.07573985650934924, - -7.9001062411148E-4, + -0.00079001062411148, 0.01746632456448475, -0.020153136101165186, -0.19870886170688193, @@ -2743,7 +2743,7 @@ 0.6510648032262032 ], [ - 4.783950839776602E-5, + 4.783950839776602e-05, 0.10241060539532633, 0.31907236323857324, 0.0646532494217739, @@ -3615,20 +3615,20 @@ "
\n", " \n", " \n", - "
\n", + "
\n", "