Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Schema update #2509

Merged
merged 19 commits into from
Apr 13, 2022
8 changes: 4 additions & 4 deletions docs/getting-started/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ from feast import Field, Float64, RequestSource
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={
"val_to_add": ValueType.INT64,
"val_to_add_2": ValueType.INT64
}
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2": dtype=PrimitiveFeastType.INT64),
]
)

# Use the input data and feature view features to create new features
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to-guides/adding-a-new-offline-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Finally, the custom data source class can be use in the feature repo to define a
```python
pdriver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

Expand Down
11 changes: 9 additions & 2 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from feast.request_feature_view import RequestFeatureView
from feast.types import Float32, Float64, Int64, String
from google.protobuf.duration_pb2 import Duration
from feast.field import Field

from feast import Entity, Feature, FeatureView, FileSource, ValueType

Expand Down Expand Up @@ -33,7 +34,10 @@
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)

# Define an on demand feature view which can generate new features based on
Expand All @@ -59,6 +63,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
driver_age_request_fv = RequestFeatureView(
name="driver_age",
request_data_source=RequestSource(
name="driver_age", schema={"driver_age": ValueType.INT64,}
name="driver_age",
schema=[
Field(name="driver_age", dtype=Int64),
],
),
)
Binary file not shown.
4 changes: 2 additions & 2 deletions go/internal/feast/model/ondemandfeatureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (fs *OnDemandFeatureView) ProjectWithFeatures(featureNames []string) (*OnDe
func (fs *OnDemandFeatureView) GetRequestDataSchema() map[string]types.ValueType_Enum {
schema := make(map[string]types.ValueType_Enum)
for _, requestDataSource := range fs.SourceRequestDataSources {
for fieldName, fieldValueType := range requestDataSource.Schema {
schema[fieldName] = fieldValueType
for _, featureSpec := range requestDataSource.Schema {
schema[featureSpec.Name] = featureSpec.ValueType
}
}
return schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from feast.types import Float32, Float64, Int64
from feast.value_type import ValueType
from google.protobuf.duration_pb2 import Duration

from feast import FileSource

file_path = "driver_stats.parquet"
Expand Down
6 changes: 5 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ option java_package = "feast.proto.core";

import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 28
Expand Down Expand Up @@ -212,7 +213,10 @@ message DataSource {
message RequestDataOptions {
reserved 1;
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 2;
map<string, feast.types.ValueType.Enum> deprecated_schema = 2;

repeated FeatureSpecV2 schema = 3;

}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/types/Field.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/types";

message Field {
string name = 1;
feast.types.Value value = 2;
feast.types.ValueType.Enum value = 2;
}
113 changes: 93 additions & 20 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
import enum
import warnings
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from google.protobuf.json_format import MessageToJson

from feast import type_map
from feast.data_format import StreamFormat
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
from feast.value_type import ValueType


Expand Down Expand Up @@ -449,27 +451,45 @@ class RequestSource(DataSource):

Args:
name: Name of the request data source
schema: Schema mapping from the input feature name to a ValueType
schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the request data source, typically the email of the primary
maintainer.
"""

name: str
schema: Dict[str, ValueType]
schema: List[Field]

def __init__(
self,
name: str,
schema: Dict[str, ValueType],
schema: Union[Dict[str, ValueType], List[Field]],
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
):
"""Creates a RequestSource object."""
super().__init__(name=name, description=description, tags=tags, owner=owner)
self.schema = schema
if isinstance(schema, Dict):
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
schemaList = []
for key, valueType in schema.items():
schemaList.append(
Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType])
)
self.schema = schemaList
elif isinstance(schema, List):
self.schema = schema
else:
raise Exception(
"Schema type must be either dictionary or list, not "
+ str(type(schema))
)

def validate(self, config: RepoConfig):
pass
Expand All @@ -479,33 +499,86 @@ def get_table_column_names_and_types(
) -> Iterable[Tuple[str, str]]:
pass

def __eq__(self, other):
if not isinstance(other, RequestSource):
raise TypeError(
"Comparisons should only involve RequestSource class objects."
)
if (
self.name != other.name
or self.description != other.description
or self.owner != other.owner
or self.tags != other.tags
):
return False
if isinstance(self.schema, List) and isinstance(other.schema, List):
for field1, field2 in zip(self.schema, other.schema):
if field1 != field2:
return False
return True
else:
return False

def __hash__(self):
return super().__hash__()

@staticmethod
def from_proto(data_source: DataSourceProto):

deprecated_schema = data_source.request_data_options.deprecated_schema
schema_pb = data_source.request_data_options.schema
schema = {}
for key, val in schema_pb.items():
schema[key] = ValueType(val)
return RequestSource(
name=data_source.name,
schema=schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

if deprecated_schema and not schema_pb:
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
dict_schema = {}
for key, val in deprecated_schema.items():
dict_schema[key] = ValueType(val)
return RequestSource(
name=data_source.name,
schema=dict_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)
else:
list_schema = []
for field_proto in schema_pb:
list_schema.append(Field.from_proto(field_proto))

return RequestSource(
name=data_source.name,
schema=list_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(schema=schema_pb)

schema_pb = []

if isinstance(self.schema, Dict):
for key, value in self.schema.items():
schema_pb.append(
Field(
name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value]
).to_proto()
)
else:
for field in self.schema:
schema_pb.append(field.to_proto())
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.REQUEST_SOURCE,
request_data_options=options,
description=self.description,
tags=self.tags,
owner=self.owner,
)
data_source_proto.request_data_options.schema.extend(schema_pb)

return data_source_proto

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def from_proto(cls, field_proto: FieldProto):
field_proto: FieldProto protobuf object
"""
value_type = ValueType(field_proto.value_type)
return cls(name=field_proto.name, dtype=from_value_type(value_type))
return cls(name=field_proto.name, dtype=from_value_type(value_type=value_type))

@classmethod
def from_feature(cls, feature: Feature):
Expand Down
18 changes: 14 additions & 4 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,17 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
for request_source in self.source_request_sources.values():
schema.update(request_source.schema)
if isinstance(request_source.schema, List):
new_schema = {}
for field in request_source.schema:
new_schema[field.name] = field.dtype.to_value_type()
schema.update(new_schema)
elif isinstance(request_source.schema, Dict):
kevjumba marked this conversation as resolved.
Show resolved Hide resolved
schema.update(request_source.schema)
else:
raise Exception(
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
)
return schema

def get_transformed_features_df(
Expand Down Expand Up @@ -409,9 +419,9 @@ def infer_features(self):
)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in self.source_request_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
for field in request_data.schema:
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
df[f"{field.name}"] = pd.Series(dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
Expand Down
15 changes: 10 additions & 5 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from feast.base_feature_view import BaseFeatureView
from feast.data_source import RequestSource
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.field import Field
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
Expand Down Expand Up @@ -63,12 +63,17 @@ def __init__(
DeprecationWarning,
)

if isinstance(request_data_source.schema, Dict):
new_features = [
Field(name=name, dtype=dtype)
for name, dtype in request_data_source.schema.items()
]
else:
new_features = request_data_source.schema

super().__init__(
name=name,
features=[
Field(name=name, dtype=from_value_type(value_type))
for name, value_type in request_data_source.schema.items()
],
features=new_features,
description=description,
tags=tags,
owner=owner,
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ def to_value_type(self) -> ValueType:
def __str__(self):
return PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]

def __eq__(self, other):
if isinstance(other, PrimitiveFeastType):
return self.value == other.value
else:
return False

def __hash__(self):
return hash((PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]))


Invalid = PrimitiveFeastType.INVALID
Bytes = PrimitiveFeastType.BYTES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def similarity_feature_view(


def create_conv_rate_request_source():
return RequestSource(name="conv_rate_input", schema={"val_to_add": ValueType.INT32})
return RequestSource(
name="conv_rate_input", schema=[Field(name="val_to_add", dtype=Int32)],
)


def create_similarity_request_source():
Expand Down
Loading