diff --git a/docs/getting-started/concepts/feature-view.md b/docs/getting-started/concepts/feature-view.md index fa6612fbad..2786591d29 100644 --- a/docs/getting-started/concepts/feature-view.md +++ b/docs/getting-started/concepts/feature-view.md @@ -144,10 +144,10 @@ from feast import Field, Float64, RequestSource # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestSource( name="vals_to_add", - schema={ - "val_to_add": ValueType.INT64, - "val_to_add_2": ValueType.INT64 - } + schema=[ + Field(name="val_to_add", dtype=PrimitiveFeastType.INT64), + Field(name="val_to_add_2": dtype=PrimitiveFeastType.INT64), + ] ) # Use the input data and feature view features to create new features diff --git a/docs/how-to-guides/adding-a-new-offline-store.md b/docs/how-to-guides/adding-a-new-offline-store.md index 27b70421c3..ee2a8ff931 100644 --- a/docs/how-to-guides/adding-a-new-offline-store.md +++ b/docs/how-to-guides/adding-a-new-offline-store.md @@ -246,7 +246,7 @@ Finally, the custom data source class can be use in the feature repo to define a ```python pdriver_hourly_stats = CustomFileDataSource( path="feature_repo/data/driver_stats.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index d7ce41134a..ce9469647f 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -5,6 +5,7 @@ from feast.request_feature_view import RequestFeatureView from feast.types import Float32, Float64, Int64, String from google.protobuf.duration_pb2 import Duration +from feast.field import Field from feast import Entity, Feature, FeatureView, FileSource, ValueType @@ -33,7 +34,10 @@ # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestSource( name="vals_to_add", - schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64}, + schema=[ + Field(name="val_to_add", dtype=Int64), + Field(name="val_to_add_2", dtype=Int64), + ], ) # Define an on demand feature view which can generate new features based on @@ -59,6 +63,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: driver_age_request_fv = RequestFeatureView( name="driver_age", request_data_source=RequestSource( - name="driver_age", schema={"driver_age": ValueType.INT64,} + name="driver_age", + schema=[ + Field(name="driver_age", dtype=Int64), + ], ), ) diff --git a/go/cmd/server/logging/feature_repo/data/online_store.db b/go/cmd/server/logging/feature_repo/data/online_store.db new file mode 100644 index 0000000000..b6ccea139e Binary files /dev/null and b/go/cmd/server/logging/feature_repo/data/online_store.db differ diff --git a/go/internal/feast/model/ondemandfeatureview.go b/go/internal/feast/model/ondemandfeatureview.go index b8d28c6d7e..adf9235607 100644 --- a/go/internal/feast/model/ondemandfeatureview.go +++ b/go/internal/feast/model/ondemandfeatureview.go @@ -55,8 +55,8 @@ func (fs *OnDemandFeatureView) ProjectWithFeatures(featureNames []string) (*OnDe func (fs *OnDemandFeatureView) GetRequestDataSchema() map[string]types.ValueType_Enum { schema := make(map[string]types.ValueType_Enum) for _, requestDataSource := range fs.SourceRequestDataSources { - for fieldName, fieldValueType := range requestDataSource.Schema { - schema[fieldName] = fieldValueType + for _, featureSpec := range requestDataSource.Schema { + schema[featureSpec.Name] = featureSpec.ValueType } } return schema diff --git a/java/serving/src/test/resources/docker-compose/feast10/definitions.py b/java/serving/src/test/resources/docker-compose/feast10/definitions.py index 35c5a2c127..8ffc77c33c 100644 --- a/java/serving/src/test/resources/docker-compose/feast10/definitions.py +++ b/java/serving/src/test/resources/docker-compose/feast10/definitions.py @@ -8,7 +8,6 @@ from feast.types import Float32, Float64, Int64 from feast.value_type import ValueType from google.protobuf.duration_pb2 import Duration - from feast import FileSource file_path = "driver_stats.parquet" diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 5bd91e5703..d958281ca2 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -24,6 +24,7 @@ option java_package = "feast.proto.core"; import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; +import "feast/core/Feature.proto"; // Defines a Data Source that can be used source Feature data // Next available id: 28 @@ -212,7 +213,10 @@ message DataSource { message RequestDataOptions { reserved 1; // Mapping of feature name to type - map schema = 2; + map deprecated_schema = 2; + + repeated FeatureSpecV2 schema = 3; + } // Defines options for DataSource that supports pushing data to it. This allows data to be pushed to diff --git a/protos/feast/types/Field.proto b/protos/feast/types/Field.proto index 57c53b3b6c..8349263cc6 100644 --- a/protos/feast/types/Field.proto +++ b/protos/feast/types/Field.proto @@ -26,5 +26,5 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/types"; message Field { string name = 1; - feast.types.Value value = 2; + feast.types.ValueType.Enum value = 2; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index db7764493c..fb0a1701a3 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -15,14 +15,16 @@ import enum import warnings from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, Iterable, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from google.protobuf.json_format import MessageToJson from feast import type_map from feast.data_format import StreamFormat +from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.repo_config import RepoConfig, get_data_source_class_from_type +from feast.types import VALUE_TYPES_TO_FEAST_TYPES from feast.value_type import ValueType @@ -449,7 +451,7 @@ class RequestSource(DataSource): Args: name: Name of the request data source - schema: Schema mapping from the input feature name to a ValueType + schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the request data source, typically the email of the primary @@ -457,19 +459,37 @@ class RequestSource(DataSource): """ name: str - schema: Dict[str, ValueType] + schema: List[Field] def __init__( self, name: str, - schema: Dict[str, ValueType], + schema: Union[Dict[str, ValueType], List[Field]], description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", ): """Creates a RequestSource object.""" super().__init__(name=name, description=description, tags=tags, owner=owner) - self.schema = schema + if isinstance(schema, Dict): + warnings.warn( + "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. " + "Please use List[Field] instead for the schema", + DeprecationWarning, + ) + schemaList = [] + for key, valueType in schema.items(): + schemaList.append( + Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType]) + ) + self.schema = schemaList + elif isinstance(schema, List): + self.schema = schema + else: + raise Exception( + "Schema type must be either dictionary or list, not " + + str(type(schema)) + ) def validate(self, config: RepoConfig): pass @@ -479,33 +499,86 @@ def get_table_column_names_and_types( ) -> Iterable[Tuple[str, str]]: pass + def __eq__(self, other): + if not isinstance(other, RequestSource): + raise TypeError( + "Comparisons should only involve RequestSource class objects." + ) + if ( + self.name != other.name + or self.description != other.description + or self.owner != other.owner + or self.tags != other.tags + ): + return False + if isinstance(self.schema, List) and isinstance(other.schema, List): + for field1, field2 in zip(self.schema, other.schema): + if field1 != field2: + return False + return True + else: + return False + + def __hash__(self): + return super().__hash__() + @staticmethod def from_proto(data_source: DataSourceProto): + + deprecated_schema = data_source.request_data_options.deprecated_schema schema_pb = data_source.request_data_options.schema - schema = {} - for key, val in schema_pb.items(): - schema[key] = ValueType(val) - return RequestSource( - name=data_source.name, - schema=schema, - description=data_source.description, - tags=dict(data_source.tags), - owner=data_source.owner, - ) + + if deprecated_schema and not schema_pb: + warnings.warn( + "Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. " + "Please use List[Field] instead for the schema", + DeprecationWarning, + ) + dict_schema = {} + for key, val in deprecated_schema.items(): + dict_schema[key] = ValueType(val) + return RequestSource( + name=data_source.name, + schema=dict_schema, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + else: + list_schema = [] + for field_proto in schema_pb: + list_schema.append(Field.from_proto(field_proto)) + + return RequestSource( + name=data_source.name, + schema=list_schema, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) def to_proto(self) -> DataSourceProto: - schema_pb = {} - for key, value in self.schema.items(): - schema_pb[key] = value.value - options = DataSourceProto.RequestDataOptions(schema=schema_pb) + + schema_pb = [] + + if isinstance(self.schema, Dict): + for key, value in self.schema.items(): + schema_pb.append( + Field( + name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value] + ).to_proto() + ) + else: + for field in self.schema: + schema_pb.append(field.to_proto()) data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.REQUEST_SOURCE, - request_data_options=options, description=self.description, tags=self.tags, owner=self.owner, ) + data_source_proto.request_data_options.schema.extend(schema_pb) return data_source_proto diff --git a/sdk/python/feast/field.py b/sdk/python/feast/field.py index 2198d8a3f1..f6c88f1850 100644 --- a/sdk/python/feast/field.py +++ b/sdk/python/feast/field.py @@ -74,7 +74,7 @@ def from_proto(cls, field_proto: FieldProto): field_proto: FieldProto protobuf object """ value_type = ValueType(field_proto.value_type) - return cls(name=field_proto.name, dtype=from_value_type(value_type)) + return cls(name=field_proto.name, dtype=from_value_type(value_type=value_type)) @classmethod def from_feature(cls, feature: Feature): diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index a453cbb4db..790891b078 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -352,7 +352,17 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): def get_request_data_schema(self) -> Dict[str, ValueType]: schema: Dict[str, ValueType] = {} for request_source in self.source_request_sources.values(): - schema.update(request_source.schema) + if isinstance(request_source.schema, List): + new_schema = {} + for field in request_source.schema: + new_schema[field.name] = field.dtype.to_value_type() + schema.update(new_schema) + elif isinstance(request_source.schema, Dict): + schema.update(request_source.schema) + else: + raise Exception( + f"Request source schema is not correct type: ${str(type(request_source.schema))}" + ) return schema def get_transformed_features_df( @@ -409,9 +419,9 @@ def infer_features(self): ) df[f"{feature.name}"] = pd.Series(dtype=dtype) for request_data in self.source_request_sources.values(): - for feature_name, feature_type in request_data.schema.items(): - dtype = feast_value_type_to_pandas_type(feature_type) - df[f"{feature_name}"] = pd.Series(dtype=dtype) + for field in request_data.schema: + dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) + df[f"{field.name}"] = pd.Series(dtype=dtype) output_df: pd.DataFrame = self.udf.__call__(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py index 863a4b4964..7248ffe989 100644 --- a/sdk/python/feast/request_feature_view.py +++ b/sdk/python/feast/request_feature_view.py @@ -5,7 +5,7 @@ from feast.base_feature_view import BaseFeatureView from feast.data_source import RequestSource from feast.feature_view_projection import FeatureViewProjection -from feast.field import Field, from_value_type +from feast.field import Field from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) @@ -63,12 +63,17 @@ def __init__( DeprecationWarning, ) + if isinstance(request_data_source.schema, Dict): + new_features = [ + Field(name=name, dtype=dtype) + for name, dtype in request_data_source.schema.items() + ] + else: + new_features = request_data_source.schema + super().__init__( name=name, - features=[ - Field(name=name, dtype=from_value_type(value_type)) - for name, value_type in request_data_source.schema.items() - ], + features=new_features, description=description, tags=tags, owner=owner, diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 29a49a72c0..40c1d62e7d 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -80,6 +80,15 @@ def to_value_type(self) -> ValueType: def __str__(self): return PRIMITIVE_FEAST_TYPES_TO_STRING[self.name] + def __eq__(self, other): + if isinstance(other, PrimitiveFeastType): + return self.value == other.value + else: + return False + + def __hash__(self): + return hash((PRIMITIVE_FEAST_TYPES_TO_STRING[self.name])) + Invalid = PrimitiveFeastType.INVALID Bytes = PrimitiveFeastType.BYTES diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 8a89a81128..5992222644 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -123,7 +123,9 @@ def similarity_feature_view( def create_conv_rate_request_source(): - return RequestSource(name="conv_rate_input", schema={"val_to_add": ValueType.INT32}) + return RequestSource( + name="conv_rate_input", schema=[Field(name="val_to_add", dtype=Int32)], + ) def create_similarity_request_source(): diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index c0358abb59..526f422e9d 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -29,7 +29,7 @@ SparkSource, ) from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import String, UnixTimestamp +from feast.types import PrimitiveFeastType, String, UnixTimestamp from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -168,7 +168,8 @@ def test_update_data_sources_with_inferred_event_timestamp_col(universal_data_so def test_on_demand_features_type_inference(): # Create Feature Views date_request = RequestSource( - name="date_request", schema={"some_date": ValueType.UNIX_TIMESTAMP} + name="date_request", + schema=[Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP)], ) @on_demand_feature_view( @@ -224,11 +225,17 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: test_view_with_missing_feature.infer_features() -def test_datasource_inference(): +# TODO(kevjumba): remove this in feast 0.23 when deprecating +@pytest.mark.parametrize( + "request_source_schema", + [ + [Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP)], + {"some_date": ValueType.UNIX_TIMESTAMP}, + ], +) +def test_datasource_inference(request_source_schema): # Create Feature Views - date_request = RequestSource( - name="date_request", schema={"some_date": ValueType.UNIX_TIMESTAMP} - ) + date_request = RequestSource(name="date_request", schema=request_source_schema,) @on_demand_feature_view( # Note: we deliberately use positional arguments here to test that they work correctly, diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 25b0e7714e..072be15bfe 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -29,7 +29,7 @@ from feast.protos.feast.types import Value_pb2 as ValueProto from feast.registry import Registry from feast.repo_config import RegistryConfig -from feast.types import Array, Bytes, Float32, Int32, Int64, String +from feast.types import Array, Bytes, Float32, Int32, Int64, PrimitiveFeastType, String from feast.value_type import ValueType @@ -237,7 +237,15 @@ def test_apply_feature_view_success(test_registry): @pytest.mark.parametrize( "test_registry", [lazy_fixture("local_registry")], ) -def test_modify_feature_views_success(test_registry): +# TODO(kevjumba): remove this in feast 0.23 when deprecating +@pytest.mark.parametrize( + "request_source_schema", + [ + [Field(name="my_input_1", dtype=PrimitiveFeastType.INT32)], + {"my_input_1": ValueType.INT32}, + ], +) +def test_modify_feature_views_success(test_registry, request_source_schema): # Create Feature Views batch_source = FileSource( file_format=ParquetFormat(), @@ -246,9 +254,7 @@ def test_modify_feature_views_success(test_registry): created_timestamp_column="timestamp", ) - request_source = RequestSource( - name="request_source", schema={"my_input_1": ValueType.INT32} - ) + request_source = RequestSource(name="request_source", schema=request_source_schema,) fv1 = FeatureView( name="my_feature_view_1", diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index f32089b3b9..49015e6ad1 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -2,7 +2,9 @@ from feast import ValueType from feast.data_source import PushSource, RequestDataSource, RequestSource +from feast.field import Field from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.types import PrimitiveFeastType def test_push_with_batch(): @@ -30,5 +32,17 @@ def test_request_data_source_deprecation(): ) request_data_source_proto = request_data_source.to_proto() returned_request_source = RequestSource.from_proto(request_data_source_proto) - assert returned_request_source.name == request_data_source.name - assert returned_request_source.schema == request_data_source.schema + assert returned_request_source == request_data_source + + +def test_request_source_primitive_type_to_proto(): + schema = [ + Field(name="f1", dtype=PrimitiveFeastType.FLOAT32), + Field(name="f2", dtype=PrimitiveFeastType.BOOL), + ] + request_source = RequestSource( + name="source", schema=schema, description="desc", tags={}, owner="feast", + ) + request_proto = request_source.to_proto() + deserialized_request_source = RequestSource.from_proto(request_proto) + assert deserialized_request_source == request_source diff --git a/ui/feature_repo/features.py b/ui/feature_repo/features.py index dfc23cc394..0f74b424e7 100644 --- a/ui/feature_repo/features.py +++ b/ui/feature_repo/features.py @@ -14,6 +14,7 @@ from feast.data_source import RequestSource from feast.request_feature_view import RequestFeatureView from feast.on_demand_feature_view import on_demand_feature_view +from feast.field import Field import pandas as pd zipcode = Entity( @@ -130,7 +131,10 @@ # Define a request data source which encodes features / information only # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestSource( - name="transaction", schema={"transaction_amt": ValueType.INT64}, + name="transaction", + schema=[ + Field(name="transaction_amt", dtype=Int64), + ], ) # Define an on demand feature view which can generate new features based on