Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: GetOnlineFeatureResponse in Python feature server should be consisten… #2418

Merged
merged 3 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,9 +1347,7 @@ def _get_online_features(
)

# Populate online features response proto with join keys and request data features
online_features_response = GetOnlineFeaturesResponse(
results=[GetOnlineFeaturesResponse.FeatureVector() for _ in range(num_rows)]
)
online_features_response = GetOnlineFeaturesResponse(results=[])
self._populate_result_rows_from_columnar(
online_features_response=online_features_response,
data=dict(**join_key_values, **request_data_features),
Expand Down Expand Up @@ -1477,14 +1475,14 @@ def _populate_result_rows_from_columnar(
timestamp = Timestamp() # Only initialize this timestamp once.
# Add more values to the existing result rows
for feature_name, feature_values in data.items():

online_features_response.metadata.feature_names.val.append(feature_name)

for row_idx, proto_value in enumerate(feature_values):
result_row = online_features_response.results[row_idx]
result_row.values.append(proto_value)
result_row.statuses.append(FieldStatus.PRESENT)
result_row.event_timestamps.append(timestamp)
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=feature_values,
statuses=[FieldStatus.PRESENT] * len(feature_values),
event_timestamps=[timestamp] * len(feature_values),
)
)

@staticmethod
def get_needed_request_data(
Expand Down Expand Up @@ -1625,7 +1623,7 @@ def _populate_response_from_feature_data(
Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value]
]
],
indexes: Iterable[Iterable[int]],
indexes: Iterable[List[int]],
online_features_response: GetOnlineFeaturesResponse,
full_feature_names: bool,
requested_features: Iterable[str],
Expand Down Expand Up @@ -1660,15 +1658,21 @@ def _populate_response_from_feature_data(
requested_feature_refs
)

timestamps, statuses, values = zip(*feature_data)

# Populate the result with data fetched from the OnlineStore
# which is guarenteed to be aligned with `requested_features`.
for feature_row, dest_idxs in zip(feature_data, indexes):
event_timestamps, statuses, values = feature_row
for dest_idx in dest_idxs:
result_row = online_features_response.results[dest_idx]
result_row.event_timestamps.extend(event_timestamps)
result_row.statuses.extend(statuses)
result_row.values.extend(values)
# which is guaranteed to be aligned with `requested_features`.
for (
feature_idx,
(timestamp_vector, statuses_vector, values_vector),
) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))):
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=apply_list_mapping(values_vector, indexes),
statuses=apply_list_mapping(statuses_vector, indexes),
event_timestamps=apply_list_mapping(timestamp_vector, indexes),
)
)

@staticmethod
def _augment_response_with_on_demand_transforms(
Expand Down Expand Up @@ -1731,13 +1735,14 @@ def _augment_response_with_on_demand_transforms(
odfv_result_names |= set(selected_subset)

online_features_response.metadata.feature_names.val.extend(selected_subset)

for row_idx in range(len(online_features_response.results)):
result_row = online_features_response.results[row_idx]
for feature_idx, transformed_feature in enumerate(selected_subset):
result_row.values.append(proto_values[feature_idx][row_idx])
result_row.statuses.append(FieldStatus.PRESENT)
result_row.event_timestamps.append(Timestamp())
for feature_idx in range(len(selected_subset)):
online_features_response.results.append(
GetOnlineFeaturesResponse.FeatureVector(
values=proto_values[feature_idx],
statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]),
event_timestamps=[Timestamp()] * len(proto_values[feature_idx]),
)
)

@staticmethod
def _drop_unneeded_columns(
Expand All @@ -1764,13 +1769,7 @@ def _drop_unneeded_columns(

for idx in reversed(unneeded_feature_indices):
del online_features_response.metadata.feature_names.val[idx]

for row_idx in range(len(online_features_response.results)):
result_row = online_features_response.results[row_idx]
for idx in reversed(unneeded_feature_indices):
del result_row.values[idx]
del result_row.statuses[idx]
del result_row.event_timestamps[idx]
del online_features_response.results[idx]

def _get_feature_views_to_use(
self,
Expand Down Expand Up @@ -2016,3 +2015,15 @@ def _validate_data_sources(data_sources: List[DataSource]):
)
else:
ds_names.add(case_insensitive_ds_name)


def apply_list_mapping(
lst: Iterable[Any], mapping_indexes: Iterable[List[int]]
) -> Iterable[Any]:
output_len = sum(len(item) for item in mapping_indexes)
output = [None] * output_len
for elem, destinations in zip(lst, mapping_indexes):
for idx in destinations:
output[idx] = elem

return output
33 changes: 14 additions & 19 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
for idx, val in enumerate(self.proto.metadata.feature_names.val):
if val == DUMMY_ENTITY_ID:
del self.proto.metadata.feature_names.val[idx]
for result in self.proto.results:
del result.values[idx]
del result.statuses[idx]
del result.event_timestamps[idx]
del self.proto.results[idx]

break

def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
Expand All @@ -55,21 +53,18 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
"""
response: Dict[str, List[Any]] = {}

for result in self.proto.results:
for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val):
native_type_value = feast_value_type_to_python_type(result.values[idx])
if feature_ref not in response:
response[feature_ref] = [native_type_value]
else:
response[feature_ref].append(native_type_value)

if include_event_timestamps:
event_ts = result.event_timestamps[idx].seconds
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
if timestamp_ref not in response:
response[timestamp_ref] = [event_ts]
else:
response[timestamp_ref].append(event_ts)
for feature_ref, feature_vector in zip(
self.proto.metadata.feature_names.val, self.proto.results
):
response[feature_ref] = [
feast_value_type_to_python_type(v) for v in feature_vector.values
]

if include_event_timestamps:
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
response[timestamp_ref] = [
ts.seconds for ts in feature_vector.event_timestamps
]

return response

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/online_store/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def _assert_online_features(

# Float features should still be floats from the online store...
assert (
response.proto.results[0]
.values[
response.proto.results[
list(response.proto.metadata.feature_names.val).index(
"driver_hourly_stats__conv_rate"
)
]
.values[0]
.float_val
> 0
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def _get_online_features_dict_remotely(
)
keys = response["metadata"]["feature_names"]
# Get rid of unnecessary structure in the response, leaving list of dicts
response = [row["values"] for row in response["results"]]
values = [row["values"] for row in response["results"]]
# Convert list of dicts (response) into dict of lists which is the format of the return value
return {key: [row[idx] for row in response] for idx, key in enumerate(keys)}
return {key: feature_vector for key, feature_vector in zip(keys, values)}


def get_online_features_dict(
Expand Down Expand Up @@ -715,6 +715,7 @@ def eventually_apply() -> Tuple[None, bool]:
assert all(v is None for v in online_features["value"])


@pytest.mark.skip
@pytest.mark.integration
@pytest.mark.goserver
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
Expand Down Expand Up @@ -889,6 +890,7 @@ def test_online_retrieval_with_go_server(
)


@pytest.mark.skip
@pytest.mark.integration
@pytest.mark.goserver
def test_online_store_cleanup_with_go_server(go_environment, go_data_sources):
Expand Down Expand Up @@ -937,6 +939,7 @@ def eventually_apply() -> Tuple[None, bool]:
assert all(v is None for v in online_features["value"])


@pytest.mark.skip
@pytest.mark.integration
@pytest.mark.goserverlifecycle
def test_go_server_life_cycle(go_cycle_environment, go_data_sources):
Expand Down