Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize _populate_result_rows_from_feature_view #2223

Merged
merged 10 commits into from
Jan 26, 2022
190 changes: 136 additions & 54 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,27 +1207,24 @@ def _get_online_features(
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
)

# Initialize the set of EntityKeyProtos once and reuse them for each FeatureView
# to avoid initialization overhead.
entity_keys = [EntityKeyProto() for _ in range(num_rows)]
provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
table_entity_values = self._get_table_entity_values(
table, entity_name_to_join_key_map, join_key_values,
table_entity_values, idxs = self._get_unique_entities(
table, join_key_values, entity_name_to_join_key_map,
)

# Set the EntityKeyProtos inplace.
self._set_table_entity_keys(
table_entity_values, entity_keys,
# Fetch feature data for the minimum set of Entities.
feature_data = self._read_from_online_store(
table_entity_values, provider, requested_features, table,
)

# Populate the result_rows with the Features from the OnlineStore inplace.
self._populate_result_rows_from_feature_view(
self._populate_response_from_feature_data(
feature_data,
idxs,
online_features_response,
entity_keys,
full_feature_names,
provider,
requested_features,
table,
)
Expand Down Expand Up @@ -1312,22 +1309,6 @@ def _get_table_entity_values(
}
return entity_values

@staticmethod
def _set_table_entity_keys(
entity_values: Dict[str, List[Value]], entity_keys: List[EntityKeyProto],
):
"""
This method sets the a list of EntityKeyProtos inplace.
"""
keys = entity_values.keys()
# Columar to rowise (dict keys and values are guaranteed to have the same order).
rowise_values = zip(*entity_values.values())
for entity_key in entity_keys:
# Make sure entity_keys are empty before setting.
entity_key.Clear()
entity_key.join_keys.extend(keys)
entity_key.entity_values.extend(next(rowise_values))

@staticmethod
def _populate_result_rows_from_columnar(
online_features_response: GetOnlineFeaturesResponse,
Expand Down Expand Up @@ -1380,21 +1361,134 @@ def ensure_request_data_values_exist(
feature_names=missing_features
)

def _populate_result_rows_from_feature_view(
def _get_unique_entities(
self,
online_features_response: GetOnlineFeaturesResponse,
entity_keys: List[EntityKeyProto],
full_feature_names: bool,
table: FeatureView,
join_key_values: Dict[str, List[Value]],
entity_name_to_join_key_map: Dict[str, str],
) -> Tuple[Tuple[Dict[str, Value], ...], Tuple[List[int], ...]]:
""" Return the set of unique composite Entities for a Feature View and the indexes at which they appear.

This method allows us to query the OnlineStore for data we need only once
rather than requesting and processing data for the same combination of
Entities multiple times.
"""
# Get the correct set of entity values with the correct join keys.
table_entity_values = self._get_table_entity_values(
table, entity_name_to_join_key_map, join_key_values,
)

# Convert back to rowise.
keys = table_entity_values.keys()
# Sort the rowise data to allow for grouping but keep original index. This lambda is
# sufficient as Entity types cannot be complex (ie. lists).
rowise = list(enumerate(zip(*table_entity_values.values())))
rowise.sort(
key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1])
)

# Identify unique entities and the indexes at which they occur.
unique_entities: Tuple[Dict[str, Value], ...]
indexes: Tuple[List[int], ...]
unique_entities, indexes = tuple(
zip(
*[
(dict(zip(keys, k)), [_[0] for _ in g])
for k, g in itertools.groupby(rowise, key=lambda x: x[1])
]
)
)
return unique_entities, indexes

def _read_from_online_store(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
):
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
""" Read and process data from the OnlineStore for a given FeatureView.

This method guarentees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.

This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
# Instantiate one EntityKeyProto per Entity.
entity_key_protos = [
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
for row in entity_rows
]

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_keys,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
row_ts_proto = Timestamp()
null_value = Value()
read_row_protos = []
for read_row in read_rows:
row_ts, feature_data = read_row
if row_ts is not None:
row_ts_proto.FromDatetime(row_ts)
event_timestamps = [row_ts_proto] * len(requested_features)
if feature_data is None:
statuses = [FieldStatus.NOT_FOUND] * len(requested_features)
values = [null_value] * len(requested_features)
else:
statuses = []
values = []
for feature_name in requested_features:
# Make sure order of data is the same as requested_features.
if feature_name not in feature_data:
statuses.append(FieldStatus.NOT_FOUND)
values.append(null_value)
else:
statuses.append(FieldStatus.PRESENT)
values.append(feature_data[feature_name])
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

@staticmethod
def _populate_response_from_feature_data(
feature_data: Iterable[
Tuple[
Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value]
]
],
indexes: Iterable[Iterable[int]],
online_features_response: GetOnlineFeaturesResponse,
full_feature_names: bool,
requested_features: Iterable[str],
table: FeatureView,
):
""" Populate the GetOnlineFeaturesReponse with feature data.

This method assumes that `_read_from_online_store` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.

Args:
feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore.
indexes: A list of indexes which should be the same length as `feature_data`. Each list
of indexes corresponds to a set of result rows in `online_features_response`.
online_features_response: The object to populate.
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
"customer_fv__daily_transactions").
requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the
data in `feature_data`.
table: The FeatureView that `feature_data` was retrieved from.
"""
# Add the feature names to the response.
requested_feature_refs = [
f"{table.projection.name_to_use()}__{feature_name}"
if full_feature_names
Expand All @@ -1404,28 +1498,16 @@ def _populate_result_rows_from_feature_view(
online_features_response.metadata.feature_names.val.extend(
requested_feature_refs
)
# Each row is a set of features for a given entity key
for row_idx, read_row in enumerate(read_rows):
row_ts, feature_data = read_row
result_row = online_features_response.results[row_idx]
row_ts_proto = Timestamp()
if row_ts is not None:
row_ts_proto.FromDatetime(row_ts)
result_row.event_timestamps.extend([row_ts_proto] * len(requested_features))

if feature_data is None:
result_row.statuses.extend(
[FieldStatus.NOT_FOUND] * len(requested_features)
)
result_row.values.extend([Value()] * len(requested_features))
else:
for feature_name in requested_features:
if feature_name not in feature_data:
result_row.statuses.append(FieldStatus.NOT_FOUND)
result_row.values.append(Value())
else:
result_row.statuses.append(FieldStatus.PRESENT)
result_row.values.append(feature_data[feature_name])
# Populate the result with data fetched from the OnlineStore
# which is guarenteed to be aligned with `requested_features`.
for feature_row, dest_idxs in zip(feature_data, indexes):
event_timestamps, statuses, values = feature_row
for dest_idx in dest_idxs:
result_row = online_features_response.results[dest_idx]
result_row.event_timestamps.extend(event_timestamps)
result_row.statuses.extend(statuses)
result_row.values.extend(values)

@staticmethod
def _augment_response_with_on_demand_transforms(
Expand Down
Empty file.
Empty file.
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ def _get_features_for_entity(
res_ts = Timestamp()
ts_val = res_val.pop(f"_ts:{feature_view}")
if ts_val:
res_ts.ParseFromString(ts_val)
res_ts.ParseFromString(bytes(ts_val))

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
if val_bin:
val.ParseFromString(val_bin)
val.ParseFromString(bytes(val_bin))
res[feature_name] = val

if not res:
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import sys
from importlib.abc import Loader
from importlib.machinery import ModuleSpec
from pathlib import Path
from typing import List, Set, Union, cast

Expand Down Expand Up @@ -78,7 +79,11 @@ def get_repo_files(repo_root: Path) -> List[Path]:
ignore_files = get_ignore_files(repo_root, ignore_paths)

# List all Python files in the root directory (recursively)
repo_files = {p.resolve() for p in repo_root.glob("**/*.py") if p.is_file()}
repo_files = {
p.resolve()
for p in repo_root.glob("**/*.py")
if p.is_file() and "__init__.py" != p.name
}
# Ignore all files that match any of the ignore paths in .feastignore
repo_files -= ignore_files

Expand Down Expand Up @@ -375,6 +380,7 @@ def init_repo(repo_name: str, template: str):
import importlib.util

spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path))
assert isinstance(spec, ModuleSpec)
bootstrap = importlib.util.module_from_spec(spec)
assert isinstance(spec.loader, Loader)
spec.loader.exec_module(bootstrap)
Expand Down
Empty file.
Empty file.
Empty file.
7 changes: 3 additions & 4 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import numpy as np
import pandas as pd
import pyarrow
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.timestamp_pb2 import Timestamp

from feast.protos.feast.types.Value_pb2 import (
Expand All @@ -32,7 +31,7 @@
StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.value_type import ValueType
from feast.value_type import ListType, ValueType


def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
Expand Down Expand Up @@ -195,7 +194,7 @@ def _type_err(item, dtype):


PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[
ValueType, Tuple[GeneratedProtocolMessageType, str, List[Type]]
ValueType, Tuple[ListType, str, List[Type]]
] = {
ValueType.FLOAT_LIST: (
FloatList,
Expand Down Expand Up @@ -273,7 +272,7 @@ def _python_value_to_proto_value(
raise _type_err(first_invalid, valid_types[0])

return [
ProtoValue(**{field_name: proto_type(val=value)})
ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore
if value is not None
else ProtoValue()
for value in values
Expand Down
22 changes: 22 additions & 0 deletions sdk/python/feast/value_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from typing import Type, Union

from feast.protos.feast.types.Value_pb2 import (
BoolList,
BytesList,
DoubleList,
FloatList,
Int32List,
Int64List,
StringList,
)


class ValueType(enum.Enum):
Expand All @@ -37,3 +48,14 @@ class ValueType(enum.Enum):
BOOL_LIST = 17
UNIX_TIMESTAMP_LIST = 18
NULL = 19


ListType = Union[
Type[BoolList],
Type[BytesList],
Type[DoubleList],
Type[FloatList],
Type[Int32List],
Type[Int64List],
Type[StringList],
]
Loading