Skip to content

Commit

Permalink
fix!: GetOnlineFeatureResponse in Python feature server should be con…
Browse files Browse the repository at this point in the history
…sisten… (#2418)

* GetOnlineFeatureResponse in Python feature server should be consistent with Java implementation

Signed-off-by: pyalex <[email protected]>

* fix local e2e test

Signed-off-by: pyalex <[email protected]>

* reuse timestamp in response generation

Signed-off-by: pyalex <[email protected]>
  • Loading branch information
pyalex authored Mar 21, 2022
1 parent 068c765 commit 835eef2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 56 deletions.
77 changes: 44 additions & 33 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1359,9 +1359,7 @@ def _get_online_features(
)

# Populate online features response proto with join keys and request data features
online_features_response = GetOnlineFeaturesResponse(
results=[GetOnlineFeaturesResponse.FeatureVector() for _ in range(num_rows)]
)
online_features_response = GetOnlineFeaturesResponse(results=[])
self._populate_result_rows_from_columnar(
online_features_response=online_features_response,
data=dict(**join_key_values, **request_data_features),
Expand Down Expand Up @@ -1495,14 +1493,14 @@ def _populate_result_rows_from_columnar(
timestamp = Timestamp() # Only initialize this timestamp once.
# Add more values to the existing result rows
for feature_name, feature_values in data.items():

online_features_response.metadata.feature_names.val.append(feature_name)

for row_idx, proto_value in enumerate(feature_values):
result_row = online_features_response.results[row_idx]
result_row.values.append(proto_value)
result_row.statuses.append(FieldStatus.PRESENT)
result_row.event_timestamps.append(timestamp)
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=feature_values,
statuses=[FieldStatus.PRESENT] * len(feature_values),
event_timestamps=[timestamp] * len(feature_values),
)
)

@staticmethod
def get_needed_request_data(
Expand Down Expand Up @@ -1643,7 +1641,7 @@ def _populate_response_from_feature_data(
Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value]
]
],
indexes: Iterable[Iterable[int]],
indexes: Iterable[List[int]],
online_features_response: GetOnlineFeaturesResponse,
full_feature_names: bool,
requested_features: Iterable[str],
Expand Down Expand Up @@ -1678,15 +1676,21 @@ def _populate_response_from_feature_data(
requested_feature_refs
)

timestamps, statuses, values = zip(*feature_data)

# Populate the result with data fetched from the OnlineStore
# which is guarenteed to be aligned with `requested_features`.
for feature_row, dest_idxs in zip(feature_data, indexes):
event_timestamps, statuses, values = feature_row
for dest_idx in dest_idxs:
result_row = online_features_response.results[dest_idx]
result_row.event_timestamps.extend(event_timestamps)
result_row.statuses.extend(statuses)
result_row.values.extend(values)
# which is guaranteed to be aligned with `requested_features`.
for (
feature_idx,
(timestamp_vector, statuses_vector, values_vector),
) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))):
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=apply_list_mapping(values_vector, indexes),
statuses=apply_list_mapping(statuses_vector, indexes),
event_timestamps=apply_list_mapping(timestamp_vector, indexes),
)
)

@staticmethod
def _augment_response_with_on_demand_transforms(
Expand Down Expand Up @@ -1749,13 +1753,14 @@ def _augment_response_with_on_demand_transforms(
odfv_result_names |= set(selected_subset)

online_features_response.metadata.feature_names.val.extend(selected_subset)

for row_idx in range(len(online_features_response.results)):
result_row = online_features_response.results[row_idx]
for feature_idx, transformed_feature in enumerate(selected_subset):
result_row.values.append(proto_values[feature_idx][row_idx])
result_row.statuses.append(FieldStatus.PRESENT)
result_row.event_timestamps.append(Timestamp())
for feature_idx in range(len(selected_subset)):
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=proto_values[feature_idx],
statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]),
event_timestamps=[Timestamp()] * len(proto_values[feature_idx]),
)
)

@staticmethod
def _drop_unneeded_columns(
Expand All @@ -1782,13 +1787,7 @@ def _drop_unneeded_columns(

for idx in reversed(unneeded_feature_indices):
del online_features_response.metadata.feature_names.val[idx]

for row_idx in range(len(online_features_response.results)):
result_row = online_features_response.results[row_idx]
for idx in reversed(unneeded_feature_indices):
del result_row.values[idx]
del result_row.statuses[idx]
del result_row.event_timestamps[idx]
del online_features_response.results[idx]

def _get_feature_views_to_use(
self,
Expand Down Expand Up @@ -2034,3 +2033,15 @@ def _validate_data_sources(data_sources: List[DataSource]):
)
else:
ds_names.add(case_insensitive_ds_name)


def apply_list_mapping(
lst: Iterable[Any], mapping_indexes: Iterable[List[int]]
) -> Iterable[Any]:
output_len = sum(len(item) for item in mapping_indexes)
output = [None] * output_len
for elem, destinations in zip(lst, mapping_indexes):
for idx in destinations:
output[idx] = elem

return output
33 changes: 14 additions & 19 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
for idx, val in enumerate(self.proto.metadata.feature_names.val):
if val == DUMMY_ENTITY_ID:
del self.proto.metadata.feature_names.val[idx]
for result in self.proto.results:
del result.values[idx]
del result.statuses[idx]
del result.event_timestamps[idx]
del self.proto.results[idx]

break

def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
Expand All @@ -55,21 +53,18 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
"""
response: Dict[str, List[Any]] = {}

for result in self.proto.results:
for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val):
native_type_value = feast_value_type_to_python_type(result.values[idx])
if feature_ref not in response:
response[feature_ref] = [native_type_value]
else:
response[feature_ref].append(native_type_value)

if include_event_timestamps:
event_ts = result.event_timestamps[idx].seconds
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
if timestamp_ref not in response:
response[timestamp_ref] = [event_ts]
else:
response[timestamp_ref].append(event_ts)
for feature_ref, feature_vector in zip(
self.proto.metadata.feature_names.val, self.proto.results
):
response[feature_ref] = [
feast_value_type_to_python_type(v) for v in feature_vector.values
]

if include_event_timestamps:
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
response[timestamp_ref] = [
ts.seconds for ts in feature_vector.event_timestamps
]

return response

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/online_store/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def _assert_online_features(

# Float features should still be floats from the online store...
assert (
response.proto.results[0]
.values[
response.proto.results[
list(response.proto.metadata.feature_names.val).index(
"driver_hourly_stats__conv_rate"
)
]
.values[0]
.float_val
> 0
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def _get_online_features_dict_remotely(
)
keys = response["metadata"]["feature_names"]
# Get rid of unnecessary structure in the response, leaving list of dicts
response = [row["values"] for row in response["results"]]
values = [row["values"] for row in response["results"]]
# Convert list of dicts (response) into dict of lists which is the format of the return value
return {key: [row[idx] for row in response] for idx, key in enumerate(keys)}
return {key: feature_vector for key, feature_vector in zip(keys, values)}


def get_online_features_dict(
Expand Down

0 comments on commit 835eef2

Please sign in to comment.