Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert python values into proto values in bulk #2172

Merged
merged 5 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@
from feast.registry import Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.request_feature_view import RequestFeatureView
from feast.type_map import python_value_to_proto_value
from feast.type_map import python_values_to_proto_values
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
from feast.version import get_version

warnings.simplefilter("once", DeprecationWarning)
Expand Down Expand Up @@ -1207,11 +1208,13 @@ def _populate_odfv_dependencies(
):
# Add more feature values to the existing result rows for the request data features
for feature_name, feature_values in request_data_features.items():
for row_idx, feature_value in enumerate(feature_values):
proto_values = python_values_to_proto_values(
feature_values, ValueType.UNKNOWN
)

for row_idx, proto_value in enumerate(proto_values):
result_row = result_rows[row_idx]
result_row.fields[feature_name].CopyFrom(
python_value_to_proto_value(feature_value)
)
result_row.fields[feature_name].CopyFrom(proto_value)
result_row.statuses[
feature_name
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
Expand Down Expand Up @@ -1372,19 +1375,25 @@ def _augment_response_with_on_demand_transforms(
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df, full_feature_names,
)
selected_subset = [
f for f in transformed_features_df.columns if f in _feature_refs
]

proto_values_by_column = {
feature: python_values_to_proto_values(
transformed_features_df[feature].values, ValueType.UNKNOWN
)
for feature in selected_subset
}

for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]

selected_subset = [
f for f in transformed_features_df.columns if f in _feature_refs
]

for transformed_feature in selected_subset:
odfv_result_names.add(transformed_feature)
proto_value = python_value_to_proto_value(
transformed_features_df[transformed_feature].values[row_idx]
result_row.fields[transformed_feature].CopyFrom(
proto_values_by_column[transformed_feature][row_idx]
)
result_row.fields[transformed_feature].CopyFrom(proto_value)
result_row.statuses[
transformed_feature
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
Expand Down
25 changes: 14 additions & 11 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.type_map import python_value_to_proto_value
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType

PROVIDERS_CLASS_FOR_TYPE = {
"gcp": "feast.infra.gcp.GcpProvider",
Expand Down Expand Up @@ -305,26 +306,28 @@ def _convert_arrow_to_proto(
if isinstance(table, pyarrow.Table):
table = table.to_batches()[0]

# Handle join keys
join_key_values = {
k: table.column(k).to_numpy(zero_copy_only=False) for k in join_keys
columns = [(f.name, f.dtype) for f in feature_view.features] + [
(key, ValueType.UNKNOWN) for key in join_keys
]

proto_values_by_column = {
column: python_values_to_proto_values(
table.column(column).to_numpy(zero_copy_only=False), dtype
)
for column, dtype in columns
}

entity_keys = [
EntityKeyProto(
join_keys=join_keys,
entity_values=[
python_value_to_proto_value(join_key_values[k][idx]) for k in join_keys
],
entity_values=[proto_values_by_column[k][idx] for k in join_keys],
)
for idx in range(table.num_rows)
]

# Serialize the features per row
feature_dict = {
feature.name: [
python_value_to_proto_value(val, feature.dtype)
for val in table.column(feature.name).to_numpy(zero_copy_only=False)
]
feature.name: proto_values_by_column[feature.name]
for feature in feature_view.features
}
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def _infer_online_entity_rows(
if isinstance(value, Value):
proto_value = value
else:
proto_value = _python_value_to_proto_value(entity_type_map[key], value)
proto_value = _python_value_to_proto_value(
entity_type_map[key], [value]
)[0]
fields[key] = proto_value
entity_row_list.append(GetOnlineFeaturesRequestV2.EntityRow(fields=fields))
return entity_row_list
107 changes: 69 additions & 38 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Tuple, Type
from typing import Any, Dict, List, Optional, Set, Sized, Tuple, Type

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -238,50 +238,59 @@ def _type_err(item, dtype):
}


def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> ProtoValue:
def _python_value_to_proto_value(
feast_value_type: ValueType, values: List[Any]
) -> List[ProtoValue]:
"""
Converts a Python (native, pandas) value to a Feast Proto Value based
on a provided value type

Args:
feast_value_type: The target value type
value: Value that will be converted
values: List of Values that will be converted

Returns:
Feast Value Proto
List of Feast Value Proto
"""
# ToDo: make a better sample for type checks (more than one element)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind making an issue tracking this after the PR goes in?

sample = next(filter(_non_empty_value, values), None) # first not empty value
if sample is None:
# all input values are None or empty lists
return [ProtoValue()] * len(values)

# Detect list type and handle separately
if "list" in feast_value_type.name.lower():
# Feature can be list but None is still valid
if value is None:
return ProtoValue()

if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE:
proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[
feast_value_type
]
f = {
field_name: proto_type(
val=[
item
if type(item) in valid_types
else _type_err(item, valid_types[0])
for item in value
]

if not all(type(item) in valid_types for item in sample):
first_invalid = next(
item for item in sample if type(item) not in valid_types
)
}
return ProtoValue(**f)
raise _type_err(first_invalid, valid_types[0])

return [
ProtoValue(**{field_name: proto_type(val=value)})
if value is not None
else ProtoValue()
for value in values
]

# Handle scalar types below
else:
if pd.isnull(value):
return ProtoValue()

if feast_value_type == ValueType.UNIX_TIMESTAMP:
if isinstance(value, datetime):
return ProtoValue(int64_val=int(value.timestamp()))
elif isinstance(value, Timestamp):
return ProtoValue(int64_val=int(value.ToSeconds()))
return ProtoValue(int64_val=int(value))
if isinstance(sample, datetime):
return [
ProtoValue(int64_val=int(value.timestamp())) for value in values
]
elif isinstance(sample, Timestamp):
return [
ProtoValue(int64_val=int(value.ToSeconds())) for value in values
]
return [ProtoValue(int64_val=int(value)) for value in values]

if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:
(
Expand All @@ -290,27 +299,37 @@ def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> Pro
valid_scalar_types,
) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type]
if valid_scalar_types:
assert type(value) in valid_scalar_types
kwargs = {field_name: func(value)}
return ProtoValue(**kwargs)
assert type(sample) in valid_scalar_types

raise Exception(f"Unsupported data type: ${str(type(value))}")
return [
ProtoValue(**{field_name: func(value)})
if not pd.isnull(value)
else ProtoValue()
for value in values
]

raise Exception(f"Unsupported data type: ${str(type(values[0]))}")

def python_value_to_proto_value(
value: Any, feature_type: ValueType = ValueType.UNKNOWN
) -> ProtoValue:

def python_values_to_proto_values(
values: List[Any], feature_type: ValueType = ValueType.UNKNOWN
) -> List[ProtoValue]:
value_type = feature_type
if value is not None and feature_type == ValueType.UNKNOWN:
if isinstance(value, (list, np.ndarray)):
sample = next(filter(_non_empty_value, values), None) # first not empty value
if sample is not None and feature_type == ValueType.UNKNOWN:
if isinstance(sample, (list, np.ndarray)):
value_type = (
feature_type
if len(value) == 0
else python_type_to_feast_value_type("", value)
if len(sample) == 0
else python_type_to_feast_value_type("", sample)
)
else:
value_type = python_type_to_feast_value_type("", value)
return _python_value_to_proto_value(value_type, value)
value_type = python_type_to_feast_value_type("", sample)

if value_type == ValueType.UNKNOWN:
raise TypeError("Couldn't infer value type from empty value")

return _python_value_to_proto_value(value_type, values)


def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
Expand Down Expand Up @@ -453,3 +472,15 @@ def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str:
}

return type_map[pa_type_as_str]


def _non_empty_value(value: Any) -> bool:
"""
Check that there's enough data we can use for type inference.
If primitive type - just checking that it's not None
If iterable - checking that there's some elements (len > 0)
String is special case: "" - empty string is considered non empty
"""
return value is not None and (
not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str)
)