diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index e17a5d9cf8..c91e5a40be 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -7,14 +7,14 @@ from google.protobuf.duration_pb2 import Duration from feast.field import Field -from feast import Entity, Feature, BatchFeatureView, FileSource +from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", timestamp_field="event_timestamp", created_timestamp_column="created", ) -driver = Entity(name="driver_id", description="driver id",) +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) driver_hourly_stats_view = BatchFeatureView( name="driver_hourly_stats", entities=["driver_id"], diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 710f60dca8..7cd1e4ed81 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -80,6 +80,12 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri viewNames[viewName] = nil } + entities, _ := s.fs.ListEntities(true) + entitiesByName := make(map[string]*model.Entity) + for _, entity := range entities { + entitiesByName[entity.Name] = entity + } + joinKeyTypes := make(map[string]int32) for viewName := range viewNames { @@ -88,8 +94,9 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri // skip on demand feature views continue } - for _, entityColumn := range view.EntityColumns { - joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) + for _, entityName := range view.Entities { + entity := entitiesByName[entityName] + joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } } @@ -104,14 +111,21 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN joinKeyTypes := make(map[string]int32) + entities, _ := s.fs.ListEntities(true) + entitiesByName := make(map[string]*model.Entity) + for _, entity := range entities { + entitiesByName[entity.Name] = entity + } + for _, projection := range featureService.Projections { view, err := s.fs.GetFeatureView(projection.Name, true) if err != nil { // skip on demand feature views continue } - for _, entityColumn := range view.EntityColumns { - joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) + for _, entityName := range view.Entities { + entity := entitiesByName[entityName] + joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index b0fc987fb4..4ecd781b74 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -132,7 +132,7 @@ func (fs *FeatureStore) GetOnlineFeatures( if entitylessCase { dummyEntityColumn := &prototypes.RepeatedValue{Val: make([]*prototypes.Value, numRows)} for index := 0; index < numRows; index++ { - dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY_VALUE + dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY } joinKeyToEntityValues[model.DUMMY_ENTITY_ID] = dummyEntityColumn } @@ -272,7 +272,7 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b return nil, err } if fv.HasEntity(model.DUMMY_ENTITY_NAME) && hideDummyEntity { - fv.EntityNames = []string{} + fv.Entities = []string{} } return fv, nil } diff --git a/go/internal/feast/model/basefeatureview.go b/go/internal/feast/model/basefeatureview.go index 1bdf614c25..28ef7231fd 100644 --- a/go/internal/feast/model/basefeatureview.go +++ b/go/internal/feast/model/basefeatureview.go @@ -8,15 +8,15 @@ import ( type BaseFeatureView struct { Name string - Features []*Field + Features []*Feature Projection *FeatureViewProjection } func NewBaseFeatureView(name string, featureProtos []*core.FeatureSpecV2) *BaseFeatureView { base := &BaseFeatureView{Name: name} - features := make([]*Field, len(featureProtos)) + features := make([]*Feature, len(featureProtos)) for index, featureSpecV2 := range featureProtos { - features[index] = NewFieldFromProto(featureSpecV2) + features[index] = NewFeatureFromProto(featureSpecV2) } base.Features = features base.Projection = NewFeatureViewProjectionFromDefinition(base) @@ -43,7 +43,7 @@ func (fv *BaseFeatureView) WithProjection(projection *FeatureViewProjection) (*B } func (fv *BaseFeatureView) ProjectWithFeatures(featureNames []string) *FeatureViewProjection { - features := make([]*Field, 0) + features := make([]*Feature, 0) for _, feature := range fv.Features { for _, allowedFeatureName := range featureNames { if feature.Name == allowedFeatureName { diff --git a/go/internal/feast/model/entity.go b/go/internal/feast/model/entity.go index 5a09edb655..ac3a5d5f26 100644 --- a/go/internal/feast/model/entity.go +++ b/go/internal/feast/model/entity.go @@ -2,16 +2,18 @@ package model import ( "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/feast-dev/feast/go/protos/feast/types" ) type Entity struct { - Name string - JoinKey string + Name string + ValueType types.ValueType_Enum + JoinKey string } func NewEntityFromProto(proto *core.Entity) *Entity { - return &Entity{ - Name: proto.Spec.Name, - JoinKey: proto.Spec.JoinKey, + return &Entity{Name: proto.Spec.Name, + ValueType: proto.Spec.ValueType, + JoinKey: proto.Spec.JoinKey, } } diff --git a/go/internal/feast/model/field.go b/go/internal/feast/model/feature.go similarity index 63% rename from go/internal/feast/model/field.go rename to go/internal/feast/model/feature.go index 4f72d34686..d833a8901b 100644 --- a/go/internal/feast/model/field.go +++ b/go/internal/feast/model/feature.go @@ -5,14 +5,13 @@ import ( "github.com/feast-dev/feast/go/protos/feast/types" ) -type Field struct { +type Feature struct { Name string Dtype types.ValueType_Enum } -func NewFieldFromProto(proto *core.FeatureSpecV2) *Field { - return &Field{ - Name: proto.Name, +func NewFeatureFromProto(proto *core.FeatureSpecV2) *Feature { + return &Feature{Name: proto.Name, Dtype: proto.ValueType, } } diff --git a/go/internal/feast/model/featureview.go b/go/internal/feast/model/featureview.go index ceb3736f99..6c198f9994 100644 --- a/go/internal/feast/model/featureview.go +++ b/go/internal/feast/model/featureview.go @@ -13,13 +13,12 @@ const ( DUMMY_ENTITY_VAL = "" ) -var DUMMY_ENTITY_VALUE types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} +var DUMMY_ENTITY types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} type FeatureView struct { - Base *BaseFeatureView - Ttl *durationpb.Duration - EntityNames []string - EntityColumns []*Field + Base *BaseFeatureView + Ttl *durationpb.Duration + Entities []string } func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { @@ -27,30 +26,25 @@ func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { Ttl: &(*proto.Spec.Ttl), } if len(proto.Spec.Entities) == 0 { - featureView.EntityNames = []string{DUMMY_ENTITY_NAME} + featureView.Entities = []string{DUMMY_ENTITY_NAME} } else { - featureView.EntityNames = proto.Spec.Entities + featureView.Entities = proto.Spec.Entities } - entityColumns := make([]*Field, len(proto.Spec.EntityColumns)) - for i, entityColumn := range proto.Spec.EntityColumns { - entityColumns[i] = NewFieldFromProto(entityColumn) - } - featureView.EntityColumns = entityColumns return featureView } -func (fv *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { - ttl := durationpb.Duration{Seconds: fv.Ttl.Seconds, Nanos: fv.Ttl.Nanos} +func (fs *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { + ttl := durationpb.Duration{Seconds: fs.Ttl.Seconds, Nanos: fs.Ttl.Nanos} featureView := &FeatureView{Base: base, - Ttl: &ttl, - EntityNames: fv.EntityNames, + Ttl: &ttl, + Entities: fs.Entities, } return featureView } -func (fv *FeatureView) HasEntity(name string) bool { - for _, entityName := range fv.EntityNames { - if entityName == name { +func (fs *FeatureView) HasEntity(lookup string) bool { + for _, entityName := range fs.Entities { + if entityName == lookup { return true } } diff --git a/go/internal/feast/model/featureviewprojection.go b/go/internal/feast/model/featureviewprojection.go index fe54774ff1..e80e8844ed 100644 --- a/go/internal/feast/model/featureviewprojection.go +++ b/go/internal/feast/model/featureviewprojection.go @@ -7,7 +7,7 @@ import ( type FeatureViewProjection struct { Name string NameAlias string - Features []*Field + Features []*Feature JoinKeyMap map[string]string } @@ -24,9 +24,9 @@ func NewFeatureViewProjectionFromProto(proto *core.FeatureViewProjection) *Featu JoinKeyMap: proto.JoinKeyMap, } - features := make([]*Field, len(proto.FeatureColumns)) + features := make([]*Feature, len(proto.FeatureColumns)) for index, featureSpecV2 := range proto.FeatureColumns { - features[index] = NewFieldFromProto(featureSpecV2) + features[index] = NewFeatureFromProto(featureSpecV2) } featureProjection.Features = features return featureProjection diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index e2a2df923b..1d0567c354 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -251,7 +251,7 @@ func GetEntityMaps(requestedFeatureViews []*FeatureViewAndRefs, entities []*mode joinKeyToAliasMap = map[string]string{} } - for _, entityName := range featureView.EntityNames { + for _, entityName := range featureView.Entities { joinKey := entitiesByName[entityName].JoinKey entityNameToJoinKeyMap[entityName] = joinKey @@ -518,7 +518,7 @@ func GroupFeatureRefs(requestedFeatureViews []*FeatureViewAndRefs, joinKeys := make([]string, 0) fv := featuresAndView.View featureNames := featuresAndView.FeatureRefs - for _, entityName := range fv.EntityNames { + for _, entityName := range fv.Entities { joinKeys = append(joinKeys, entityNameToJoinKeyMap[entityName]) } diff --git a/go/internal/feast/onlineserving/serving_test.go b/go/internal/feast/onlineserving/serving_test.go index bd4e45a21e..0a00f546f9 100644 --- a/go/internal/feast/onlineserving/serving_test.go +++ b/go/internal/feast/onlineserving/serving_test.go @@ -20,19 +20,19 @@ func TestGroupingFeatureRefs(t *testing.T) { NameAlias: "aliasViewA", }, }, - EntityNames: []string{"driver", "customer"}, + Entities: []string{"driver", "customer"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - EntityNames: []string{"driver", "customer"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + Entities: []string{"driver", "customer"}, } viewC := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewC"}, - EntityNames: []string{"driver"}, + Base: &model.BaseFeatureView{Name: "viewC"}, + Entities: []string{"driver"}, } viewD := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewD"}, - EntityNames: []string{"customer"}, + Base: &model.BaseFeatureView{Name: "viewD"}, + Entities: []string{"customer"}, } refGroups, _ := GroupFeatureRefs( []*FeatureViewAndRefs{ @@ -105,11 +105,11 @@ func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - EntityNames: []string{"location"}, + Entities: []string{"location"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - EntityNames: []string{"location"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + Entities: []string{"location"}, } refGroups, _ := GroupFeatureRefs( @@ -164,7 +164,7 @@ func TestGroupingFeatureRefsWithMissingKey(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - EntityNames: []string{"location"}, + Entities: []string{"location"}, } _, err := GroupFeatureRefs( diff --git a/go/internal/feast/server/logging/featureserviceschema.go b/go/internal/feast/server/logging/featureserviceschema.go index 2779982fc0..5047346c2c 100644 --- a/go/internal/feast/server/logging/featureserviceschema.go +++ b/go/internal/feast/server/logging/featureserviceschema.go @@ -52,12 +52,13 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* features = append(features, fullFeatureName) allFeatureTypes[fullFeatureName] = f.Dtype } - for _, entityColumn := range fv.EntityColumns { + for _, entityName := range fv.Entities { + entity := entityMap[entityName] var joinKey string - if joinKeyAlias, ok := featureProjection.JoinKeyMap[entityColumn.Name]; ok { + if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { joinKey = joinKeyAlias } else { - joinKey = entityColumn.Name + joinKey = entity.JoinKey } if _, ok := joinKeysSet[joinKey]; !ok { @@ -65,7 +66,7 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* } joinKeysSet[joinKey] = nil - entityJoinKeyToType[joinKey] = entityColumn.Dtype + entityJoinKeyToType[joinKey] = entity.ValueType } } else if odFv, ok := odFvMap[featureViewName]; ok { for _, f := range featureProjection.Features { diff --git a/go/internal/feast/server/logging/featureserviceschema_test.go b/go/internal/feast/server/logging/featureserviceschema_test.go index 6fa1c12e24..efcd5ec7fc 100644 --- a/go/internal/feast/server/logging/featureserviceschema_test.go +++ b/go/internal/feast/server/logging/featureserviceschema_test.go @@ -74,10 +74,9 @@ func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) - // Remove entities in featureservice + //Remove entities in featureservice for _, featureView := range fvs { - featureView.EntityNames = []string{} - featureView.EntityColumns = []*model.Field{} + featureView.Entities = []string{} } schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) @@ -127,69 +126,65 @@ func TestSchemaUsesOrderInFeatureService(t *testing.T) { // Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { - f1 := test.CreateNewField( + f1 := test.CreateNewFeature( "int64", types.ValueType_INT64, ) - f2 := test.CreateNewField( + f2 := test.CreateNewFeature( "float32", types.ValueType_FLOAT, ) projection1 := test.CreateNewFeatureViewProjection( "featureView1", "", - []*model.Field{f1, f2}, + []*model.Feature{f1, f2}, map[string]string{}, ) baseFeatureView1 := test.CreateBaseFeatureView( "featureView1", - []*model.Field{f1, f2}, + []*model.Feature{f1, f2}, projection1, ) - entity1 := test.CreateNewEntity("driver_id", "driver_id") - entitycolumn1 := test.CreateNewField( - "driver_id", - types.ValueType_INT64, - ) - featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) - f3 := test.CreateNewField( + featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) + entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") + f3 := test.CreateNewFeature( "int32", types.ValueType_INT32, ) - f4 := test.CreateNewField( + f4 := test.CreateNewFeature( "double", types.ValueType_DOUBLE, ) projection2 := test.CreateNewFeatureViewProjection( "featureView2", "", - []*model.Field{f3, f4}, + []*model.Feature{f3, f4}, map[string]string{}, ) baseFeatureView2 := test.CreateBaseFeatureView( "featureView2", - []*model.Field{f3, f4}, + []*model.Feature{f3, f4}, projection2, ) - featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) + featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) - f5 := test.CreateNewField( + f5 := test.CreateNewFeature( "odfv_f1", types.ValueType_INT32, ) - f6 := test.CreateNewField( + f6 := test.CreateNewFeature( "odfv_f2", types.ValueType_DOUBLE, ) projection3 := test.CreateNewFeatureViewProjection( "od_bf1", "", - []*model.Field{f5, f6}, + []*model.Feature{f5, f6}, map[string]string{}, ) od_bf1 := test.CreateBaseFeatureView( "od_bf1", - []*model.Field{f5, f6}, + []*model.Feature{f5, f6}, projection3, ) odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index 275edc7b98..eb727ba1db 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -193,7 +193,7 @@ func GetProtoFromRecord(rec arrow.Record) (map[string]*types.RepeatedValue, erro return r, nil } -func CreateBaseFeatureView(name string, features []*model.Field, projection *model.FeatureViewProjection) *model.BaseFeatureView { +func CreateBaseFeatureView(name string, features []*model.Feature, projection *model.FeatureViewProjection) *model.BaseFeatureView { return &model.BaseFeatureView{ Name: name, Features: features, @@ -201,15 +201,16 @@ func CreateBaseFeatureView(name string, features []*model.Field, projection *mod } } -func CreateNewEntity(name string, joinKey string) *model.Entity { +func CreateNewEntity(name string, valueType types.ValueType_Enum, joinKey string) *model.Entity { return &model.Entity{ - Name: name, - JoinKey: joinKey, + Name: name, + ValueType: valueType, + JoinKey: joinKey, } } -func CreateNewField(name string, dtype types.ValueType_Enum) *model.Field { - return &model.Field{Name: name, +func CreateNewFeature(name string, dtype types.ValueType_Enum) *model.Feature { + return &model.Feature{Name: name, Dtype: dtype, } } @@ -224,7 +225,7 @@ func CreateNewFeatureService(name string, project string, createdTimestamp *time } } -func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Field, joinKeyMap map[string]string) *model.FeatureViewProjection { +func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Feature, joinKeyMap map[string]string) *model.FeatureViewProjection { return &model.FeatureViewProjection{Name: name, NameAlias: nameAlias, Features: features, @@ -232,11 +233,10 @@ func CreateNewFeatureViewProjection(name string, nameAlias string, features []*m } } -func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string, entityColumns []*model.Field) *model.FeatureView { +func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string) *model.FeatureView { return &model.FeatureView{ - Base: base, - Ttl: ttl, - EntityNames: entities, - EntityColumns: entityColumns, + Base: base, + Ttl: ttl, + Entities: entities, } } diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index c9e38bf344..2662350540 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -35,7 +35,7 @@ message FeatureView { FeatureViewMeta meta = 2; } -// Next available id: 13 +// Next available id: 12 // TODO(adchia): refactor common fields from this and ODFV into separate metadata proto message FeatureViewSpec { // Name of the feature view. Must be unique. Not updated. @@ -44,15 +44,13 @@ message FeatureViewSpec { // Name of Feast project that this feature view belongs to. string project = 2; - // List of names of entities associated with this feature view. + // List names of entities to associate with the Features defined in this + // Feature View. Not updatable. repeated string entities = 3; - // List of specifications for each feature defined as part of this feature view. + // List of specifications for each field defined as part of this feature view. repeated FeatureSpecV2 features = 4; - // List of specifications for each entity defined as part of this feature view. - repeated FeatureSpecV2 entity_columns = 12; - // Description of the feature view. string description = 10; diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index e29ac4f6ef..3aaf0f9b69 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -31,7 +31,7 @@ class Entity: Attributes: name: The unique name of the entity. - value_type (deprecated): The type of the entity, such as string or float. + value_type: The type of the entity, such as string or float. join_key: A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities with their associated features. If not specified, defaults to the name. @@ -60,7 +60,7 @@ def __init__( self, *args, name: Optional[str] = None, - value_type: Optional[ValueType] = None, + value_type: ValueType = ValueType.UNKNOWN, description: str = "", join_key: Optional[str] = None, tags: Optional[Dict[str, str]] = None, @@ -72,7 +72,7 @@ def __init__( Args: name: The unique name of the entity. - value_type (deprecated): The type of the entity, such as string or float. + value_type: The type of the entity, such as string or float. description: A human-readable description. join_key (deprecated): A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities @@ -104,23 +104,8 @@ def __init__( if not self.name: raise ValueError("Name needs to be specified") - if value_type: - warnings.warn( - ( - "The `value_type` parameter is being deprecated. Instead, the type of an entity " - "should be specified as a Field in the schema of a feature view. Feast 0.22 and " - "onwards will not support the `value_type` parameter. The `entities` parameter of " - "feature views should also be changed to a List[Entity] instead of a List[str]; if " - "this is not done, entity columns will be mistakenly interpreted as feature columns." - ), - DeprecationWarning, - ) - self.value_type = value_type or ValueType.UNKNOWN + self.value_type = value_type - # For now, both the `join_key` and `join_keys` attributes are set correctly, - # so both are usable. - # TODO(felixwang9817): Remove the usage of `join_key` throughout the codebase - # when the usage of `join_key` as a parameter is removed. if join_key: warnings.warn( ( @@ -140,8 +125,6 @@ def __init__( self.join_key = join_keys[0] else: self.join_key = join_key if join_key else self.name - if not self.join_keys: - self.join_keys = [self.join_key] self.description = description self.tags = tags if tags is not None else {} self.owner = owner @@ -170,9 +153,6 @@ def __eq__(self, other): def __str__(self): return str(MessageToJson(self.to_proto())) - def __lt__(self, other): - return self.name < other.name - def is_valid(self): """ Validates the state of this entity locally. diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index e951eaabc3..70fab930bb 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -14,6 +14,7 @@ from feast.protos.feast.core.FeatureService_pb2 import ( LoggingConfig as LoggingConfigProto, ) +from feast.types import from_value_type if TYPE_CHECKING: from feast import FeatureService @@ -81,11 +82,14 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] else: - for entity_column in feature_view.entity_columns: + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, self._project) join_key = projection.join_key_map.get( - entity_column.name, entity_column.name + entity.join_key, entity.join_key ) - fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] + fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ + from_value_type(entity.value_type) + ] # system columns fields[REQUEST_ID_FIELD] = pa.string() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 5f4918301c..4b015e8ab8 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -64,7 +64,8 @@ ) from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_feature_views_with_inferred_features_and_entities, + update_entities_with_inferred_types_from_feature_views, + update_feature_views_with_inferred_features, ) from feast.infra.infra_object import Infra from feast.infra.provider import Provider, RetrievalJob, get_provider @@ -253,7 +254,6 @@ def _list_feature_views( ): if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: fv.entities = [] - fv.entity_columns = [] feature_views.append(fv) return feature_views @@ -476,7 +476,11 @@ def _make_inferences( views_to_update: List[FeatureView], odfvs_to_update: List[OnDemandFeatureView], ): - """Makes inferences for data sources, feature views, and odfvs.""" + """Makes inferences for entities, feature views, and odfvs.""" + update_entities_with_inferred_types_from_feature_views( + entities_to_update, views_to_update, self.config + ) + update_data_sources_with_inferred_event_timestamp_col( data_sources_to_update, self.config ) @@ -487,7 +491,7 @@ def _make_inferences( # New feature views may reference previously applied entities. entities = self._list_entities() - update_feature_views_with_inferred_features_and_entities( + update_feature_views_with_inferred_features( views_to_update, entities + entities_to_update, self.config ) @@ -513,11 +517,11 @@ def _plan( Examples: Generate a plan adding an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig >>> from feast.feature_store import RepoContents >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", description="driver id") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -525,7 +529,7 @@ def _plan( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=[driver], + ... entities=["driver_id"], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -625,10 +629,10 @@ def apply( Examples: Register an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", description="driver id") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -636,7 +640,7 @@ def apply( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=[driver], + ... entities=["driver_id"], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -683,9 +687,6 @@ def apply( data_sources_to_update = list(data_sources_set_to_update) - # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - entities_to_update.append(DUMMY_ENTITY) - # Validate all feature views and make inferences. self._validate_all_feature_views( views_to_update, odfvs_to_update, request_views_to_update @@ -694,6 +695,9 @@ def apply( data_sources_to_update, entities_to_update, views_to_update, odfvs_to_update ) + # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. + entities_to_update.append(DUMMY_ENTITY) + # Add all objects to the registry and update the provider's infrastructure. for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) @@ -1537,12 +1541,12 @@ def _get_columnar_entity_values( def _get_entity_maps( self, feature_views ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: - # TODO(felixwang9817): Support entities that have different types for different feature views. entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) entity_name_to_join_key_map: Dict[str, str] = {} entity_type_map: Dict[str, ValueType] = {} for entity in entities: entity_name_to_join_key_map[entity.name] = entity.join_key + entity_type_map[entity.name] = entity.value_type for feature_view in feature_views: for entity_name in feature_view.entities: entity = self._registry.get_entity( @@ -1557,11 +1561,7 @@ def _get_entity_maps( entity.join_key, entity.join_key ) entity_name_to_join_key_map[entity_name] = join_key - for entity_column in feature_view.entity_columns: - entity_type_map[ - entity_column.name - ] = entity_column.dtype.to_value_type() - + entity_type_map[join_key] = entity.value_type return ( entity_name_to_join_key_map, entity_type_map, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index fa9cb03305..65a4914a8f 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -36,6 +36,7 @@ MaterializationInterval as MaterializationIntervalProto, ) from feast.usage import log_exceptions +from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) @@ -43,7 +44,9 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" -DUMMY_ENTITY = Entity(name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID],) +DUMMY_ENTITY = Entity( + name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID], value_type=ValueType.STRING, +) class FeatureView(BaseFeatureView): @@ -52,7 +55,7 @@ class FeatureView(BaseFeatureView): Attributes: name: The unique name of the feature view. - entities: The list of names of entities that this feature view is associated with. + entities: The list of entities with which this group of features is associated. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. @@ -62,11 +65,9 @@ class FeatureView(BaseFeatureView): stream_source (optional): The stream source of data where this group of features is stored. This is deprecated in favor of `source`. schema: The schema of the feature view, including feature, timestamp, and entity - columns. If not specified, can be inferred from the underlying data source. - entity_columns: The list of entity columns contained in the schema. If not specified, - can be inferred from the underlying data source. - features: The list of feature columns contained in the schema. If not specified, - can be inferred from the underlying data source. + columns. + features: The list of features defined as part of this feature view. Each + feature should also be included in the schema. online: A boolean indicating whether online retrieval is enabled for this feature view. description: A human-readable description. @@ -83,7 +84,6 @@ class FeatureView(BaseFeatureView): batch_source: DataSource stream_source: Optional[DataSource] schema: List[Field] - entity_columns: List[Field] features: List[Field] online: bool description: str @@ -129,15 +129,14 @@ def __init__( owner (optional): The owner of the feature view, typically the email of the primary maintainer. schema (optional): The schema of the feature view, including feature, timestamp, - and entity columns. If entity columns are included in the schema, a List[Entity] - must be passed to `entities` instead of a List[str]; otherwise, the entity columns - will be mistakenly interpreted as feature columns. + and entity columns. source (optional): The source of data for this group of features. May be a stream source, or a batch source. If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. """ + positional_attributes = ["name", "entities", "ttl"] _name = name @@ -168,21 +167,11 @@ def __init__( raise ValueError("feature view name needs to be specified") self.name = _name - self.entities = ( [e.name if isinstance(e, Entity) else e for e in _entities] if _entities else [DUMMY_ENTITY_NAME] ) - if _entities and isinstance(_entities[0], str): - warnings.warn( - ( - "The `entities` parameter should be a list of `Entity` objects. " - "Feast 0.22 and onwards will not support passing in a list of " - "strings to define entities." - ), - DeprecationWarning, - ) self._initialize_sources(_name, batch_source, stream_source, source) @@ -217,30 +206,14 @@ def __init__( _schema = [Field.from_feature(feature) for feature in features] self.schema = _schema - # If a user has added entity fields to schema, then they should also have switched - # to using a List[Entity], in which case entity and feature columns can be separated - # here. Conversely, if the user is still using a List[str], they must not have added - # added entity fields, in which case we can set the `features` attribute directly - # equal to the schema. - _features: List[Field] = [] - self.entity_columns = [] - if _entities and len(_entities) > 0 and isinstance(_entities[0], str): - _features = _schema - else: - join_keys = [] - if _entities: - for entity in _entities: - if isinstance(entity, Entity): - join_keys += entity.join_keys - - for field in _schema: - if field.name in join_keys: - self.entity_columns.append(field) - else: - _features.append(field) + # TODO(felixwang9817): Infer which fields in the schema are features, timestamps, + # and entities. For right now we assume that all fields are features, since the + # current `features` parameter only accepts feature columns. + _features = _schema - # TODO(felixwang9817): Add more robust validation of features. - cols = [field.name for field in _schema] + cols = [entity for entity in self.entities] + [ + field.name for field in _features + ] for col in cols: if ( self.batch_source.field_mapping is not None @@ -303,6 +276,7 @@ def __hash__(self): def __copy__(self): fv = FeatureView( name=self.name, + entities=self.entities, ttl=self.ttl, source=self.batch_source, stream_source=self.stream_source, @@ -310,13 +284,6 @@ def __copy__(self): tags=self.tags, online=self.online, ) - - # This is deliberately set outside of the FV initialization to avoid the deprecation warning. - # TODO(felixwang9817): Move this into the FV initialization when the deprecation warning - # is removed. - fv.entities = self.entities - fv.features = copy.copy(self.features) - fv.entity_columns = copy.copy(self.entity_columns) fv.projection = copy.copy(self.projection) return fv @@ -335,16 +302,12 @@ def __eq__(self, other): or self.online != other.online or self.batch_source != other.batch_source or self.stream_source != other.stream_source + or self.schema != other.schema ): return False return True - @property - def join_keys(self) -> List[str]: - """Returns a list of all the join keys.""" - return [entity.name for entity in self.entity_columns] - def ensure_valid(self): """ Validates the state of this feature view locally. @@ -430,8 +393,7 @@ def to_proto(self) -> FeatureViewProto: spec = FeatureViewSpecProto( name=self.name, entities=self.entities, - entity_columns=[field.to_proto() for field in self.entity_columns], - features=[field.to_proto() for field in self.features], + features=[field.to_proto() for field in self.schema], description=self.description, tags=self.tags, owner=self.owner, @@ -462,7 +424,11 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) feature_view = cls( name=feature_view_proto.spec.name, - entities=feature_view_proto.spec.entities, + entities=[entity for entity in feature_view_proto.spec.entities], + schema=[ + Field.from_proto(field_proto) + for field_proto in feature_view_proto.spec.features + ], description=feature_view_proto.spec.description, tags=dict(feature_view_proto.spec.tags), owner=feature_view_proto.spec.owner, @@ -477,16 +443,6 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): if stream_source: feature_view.stream_source = stream_source - # Instead of passing in a schema, we set the features and entity columns. - feature_view.features = [ - Field.from_proto(field_proto) - for field_proto in feature_view_proto.spec.features - ] - feature_view.entity_columns = [ - Field.from_proto(field_proto) - for field_proto in feature_view_proto.spec.entity_columns - ] - # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. feature_view.projection = FeatureViewProjection.from_definition(feature_view) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 9ca6e9a988..076cbc86ce 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -4,13 +4,70 @@ from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource from feast.data_source import DataSource, PushSource, RequestSource from feast.errors import RegistryInferenceFailure -from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, FeatureView +from feast.feature_view import FeatureView from feast.field import Field, from_value_type from feast.repo_config import RepoConfig -from feast.types import String from feast.value_type import ValueType +def update_entities_with_inferred_types_from_feature_views( + entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig +) -> None: + """ + Infers the types of the entities by examining the schemas of feature view batch sources. + + Args: + entities: The entities to be updated. + feature_views: A list containing feature views associated with the entities. + config: The config for the current feature store. + """ + incomplete_entities = { + entity.name: entity + for entity in entities + if entity.value_type == ValueType.UNKNOWN + } + incomplete_entities_keys = incomplete_entities.keys() + + for view in feature_views: + if not (incomplete_entities_keys & set(view.entities)): + continue # skip if view doesn't contain any entities that need inference + + col_names_and_types = list( + view.batch_source.get_table_column_names_and_types(config) + ) + for entity_name in view.entities: + if entity_name in incomplete_entities: + entity = incomplete_entities[entity_name] + + # get entity information from information extracted from the view batch source + extracted_entity_name_type_pairs = list( + filter(lambda tup: tup[0] == entity.join_key, col_names_and_types,) + ) + if len(extracted_entity_name_type_pairs) == 0: + # Doesn't mention inference error because would also be an error without inferencing + raise ValueError( + f"""No column in the batch source for the {view.name} feature view matches + its entity's name.""" + ) + + inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()( + extracted_entity_name_type_pairs[0][1] + ) + + if ( + entity.value_type != ValueType.UNKNOWN + and entity.value_type != inferred_value_type + ) or (len(extracted_entity_name_type_pairs) > 1): + raise RegistryInferenceFailure( + "Entity", + f"""Entity value_type inference failed for {entity_name} entity. + Multiple viable matches. + """, + ) + + entity.value_type = inferred_value_type + + def update_data_sources_with_inferred_event_timestamp_col( data_sources: List[DataSource], config: RepoConfig ) -> None: @@ -83,115 +140,57 @@ def update_data_sources_with_inferred_event_timestamp_col( ) -def update_feature_views_with_inferred_features_and_entities( +def update_feature_views_with_inferred_features( fvs: List[FeatureView], entities: List[Entity], config: RepoConfig ) -> None: """ - Infers the set of features and entities associated with each feature view and updates - the feature view with those features and entities. Columns whose names match a join key - of an entity are considered to be entity columns; all other columns except designated - timestamp columns, are considered to be feature columns. + Infers the set of features associated to each FeatureView and updates the FeatureView with those features. + Inference occurs through considering each column of the underlying data source as a feature except columns that are + associated with the data source's timestamp columns and the FeatureView's entity columns. Args: fvs: The feature views to be updated. entities: A list containing entities associated with the feature views. config: The config for the current feature store. """ - entity_name_to_entity_map = {e.name: e for e in entities} - entity_name_to_join_keys_map = {e.name: e.join_keys for e in entities} - all_join_keys = [ - join_key - for join_key_list in entity_name_to_join_keys_map.values() - for join_key in join_key_list - ] + entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities} + join_keys = entity_name_to_join_key_map.values() for fv in fvs: - # Fields whose names match a join key are considered to be entity columns; all - # other fields are considered to be feature columns. - for field in fv.schema: - if field.name in all_join_keys: - # Do not override a preexisting field with the same name. - if field.name not in [ - entity_column.name for entity_column in fv.entity_columns - ]: - fv.entity_columns.append(field) - else: - if field.name not in [feature.name for feature in fv.features]: - fv.features.append(field) - - # Since the `value_type` parameter has not yet been fully deprecated for - # entities, we respect the `value_type` attribute if it still exists. - for entity_name in fv.entities: - entity = entity_name_to_entity_map[entity_name] - if ( - entity_name - not in [entity_column.name for entity_column in fv.entity_columns] - and entity.value_type != ValueType.UNKNOWN - ): - fv.entity_columns.append( - Field( - name=entity.join_key, dtype=from_value_type(entity.value_type), - ) - ) - - # Handle EFV separately here. Specifically, that means if we have an EFV, - # we need to add a field to entity_columns. - if len(fv.entities) == 1 and fv.entities[0] == DUMMY_ENTITY_NAME: - fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) - - # Run inference if either (a) there are fewer entity fields than expected or - # (b) there are no feature fields. - run_inference = len(fv.features) == 0 - num_expected_join_keys = sum( - [ - len(entity_name_to_join_keys_map[entity_name]) - for entity_name in fv.entities - ] - ) - if len(fv.entity_columns) < num_expected_join_keys: - run_inference = True - - if run_inference: - join_keys = set( - [ - join_key - for entity_name in fv.entities - for join_key in entity_name_to_join_keys_map[entity_name] - ] - ) + # First drop all Entity fields. Then infer features if necessary. + fv.schema = [field for field in fv.schema if field.name not in join_keys] + fv.features = [field for field in fv.features if field.name not in join_keys] + if not fv.features: columns_to_exclude = { fv.batch_source.timestamp_field, fv.batch_source.created_timestamp_column, + } | { + entity_name_to_join_key_map[entity_name] for entity_name in fv.entities } - for column in columns_to_exclude: - if column in fv.batch_source.field_mapping: - columns_to_exclude.remove(column) - columns_to_exclude.add(fv.batch_source.field_mapping[column]) - table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( - config - ) + if fv.batch_source.timestamp_field in fv.batch_source.field_mapping: + columns_to_exclude.add( + fv.batch_source.field_mapping[fv.batch_source.timestamp_field] + ) + if ( + fv.batch_source.created_timestamp_column + in fv.batch_source.field_mapping + ): + columns_to_exclude.add( + fv.batch_source.field_mapping[ + fv.batch_source.created_timestamp_column + ] + ) - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: - field = Field( - name=col_name, - dtype=from_value_type( - fv.batch_source.source_datatype_to_feast_value_type()( - col_datatype - ) - ), - ) - if field.name not in [ - entity_column.name for entity_column in fv.entity_columns - ]: - fv.entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column + for ( + col_name, + col_datatype, + ) in fv.batch_source.get_table_column_names_and_types(config): + if col_name not in columns_to_exclude and not re.match( + "^__|__$", + col_name, # double underscores often signal an internal-use column + ): feature_name = ( fv.batch_source.field_mapping[col_name] if col_name in fv.batch_source.field_mapping @@ -205,8 +204,10 @@ def update_feature_views_with_inferred_features_and_entities( ) ), ) - if field.name not in [feature.name for feature in fv.features]: - fv.features.append(field) + # Note that schema and features are two different attributes of a + # FeatureView, and that features should be present in both. + fv.schema.append(field) + fv.features.append(field) if not fv.features: raise RegistryInferenceFailure( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 7cc6dbd283..2dea5714fa 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -200,9 +200,10 @@ def evaluate_historical_retrieval(): # Build a list of entity columns to join on (from the right table) join_keys = [] - for entity_column in feature_view.entity_columns: + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, project) join_key = feature_view.projection.join_key_map.get( - entity_column.name, entity_column.name + entity.join_key, entity.join_key ) join_keys.append(join_key) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index dad0ca5b78..b6c3d300d4 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -60,9 +60,11 @@ def get_expected_join_keys( ) -> Set[str]: join_keys = set() for feature_view in feature_views: - for entity_column in feature_view.entity_columns: + entities = feature_view.entities + for entity_name in entities: + entity = registry.get_entity(entity_name, project) join_key = feature_view.projection.join_key_map.get( - entity_column.name, entity_column.name + entity.join_key, entity.join_key ) join_keys.add(join_key) return join_keys @@ -112,14 +114,14 @@ def get_feature_view_query_context( query_context = [] for feature_view, features in feature_views_to_feature_map.items(): - join_keys: List[str] = [] - entity_selections: List[str] = [] - for entity_column in feature_view.entity_columns: + join_keys, entity_selections = [], [] + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, project) join_key = feature_view.projection.join_key_map.get( - entity_column.name, entity_column.name + entity.join_key, entity.join_key ) join_keys.append(join_key) - entity_selections.append(f"{entity_column.name} AS {join_key}") + entity_selections.append(f"{entity.join_key} AS {join_key}") if isinstance(feature_view.ttl, timedelta): ttl_seconds = int(feature_view.ttl.total_seconds()) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 1a8eedb21e..9ceceff0ac 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -106,9 +106,9 @@ def update( (usually this happens when the last feature view that was using specific compound key is deleted) and remove all features attached to this "join_keys". """ - join_keys_to_keep = set(tuple(table.join_keys) for table in tables_to_keep) + join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep) - join_keys_to_delete = set(tuple(table.join_keys) for table in tables_to_delete) + join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete) for join_keys in join_keys_to_delete - join_keys_to_keep: self.delete_entity_values(config, list(join_keys)) @@ -122,7 +122,7 @@ def teardown( """ We delete the keys in redis for tables/views being removed. """ - join_keys_to_delete = set(tuple(table.join_keys) for table in tables) + join_keys_to_delete = set(tuple(table.entities) for table in tables) for join_keys in join_keys_to_delete: self.delete_entity_values(config, list(join_keys)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b663506507..0b6b798fe0 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -159,16 +159,11 @@ def materialize_single_feature_view( if feature_view.batch_source.field_mapping is not None: table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } + join_keys = {entity.join_key: entity.value_type for entity in entities} with tqdm_builder(table.num_rows) as pbar: for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto( - batch, feature_view, join_key_to_value_type - ) + rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys) self.online_write_batch( self.repo_config, feature_view, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index cd82b7d416..7754a58319 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -390,7 +390,7 @@ def _convert_arrow_to_proto( table = table.to_batches()[0] columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.features + (field.name, field.dtype.to_value_type()) for field in feature_view.schema ] + list(join_keys.items()) proto_values_by_column = { diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 2d41de5291..d2cec18e52 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -647,7 +647,7 @@ def decorator(user_function): def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: - bfv = BatchFeatureView( + return BatchFeatureView( name=fv.name, entities=fv.entities, ttl=fv.ttl, @@ -657,7 +657,3 @@ def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: schema=fv.schema, source=fv.source, ) - - bfv.features = copy.copy(fv.features) - bfv.entities = copy.copy(fv.entities) - return bfv diff --git a/sdk/python/feast/templates/aws/driver_repo.py b/sdk/python/feast/templates/aws/driver_repo.py index a6f31f07b4..5188f57cf8 100644 --- a/sdk/python/feast/templates/aws/driver_repo.py +++ b/sdk/python/feast/templates/aws/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, RedshiftSource +from feast import Entity, FeatureView, Field, RedshiftSource, ValueType from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,6 +13,8 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], + # The storage level type for an entity + value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -39,7 +41,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=[driver], + entities=["driver"], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index b3bd868f14..7d137f996b 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import BigQuerySource, Entity, FeatureView, Field +from feast import BigQuerySource, Entity, FeatureView, Field, ValueType from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,6 +13,8 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], + # The storage level type for an entity + value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -37,7 +39,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=[driver], + entities=["driver"], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/hbase/example.py b/sdk/python/feast/templates/hbase/example.py index b34696185b..1d441e0e99 100644 --- a/sdk/python/feast/templates/hbase/example.py +++ b/sdk/python/feast/templates/hbase/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource +from feast import Entity, FeatureView, Field, FileSource, ValueType from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,7 +16,7 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"]) +driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index 26821b8a93..1d441e0e99 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource +from feast import Entity, FeatureView, Field, FileSource, ValueType from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,14 +16,14 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"]) +driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this # data to our model online. driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver"], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/postgres/driver_repo.py b/sdk/python/feast/templates/postgres/driver_repo.py index 4096943bb7..34bc0022e2 100644 --- a/sdk/python/feast/templates/postgres/driver_repo.py +++ b/sdk/python/feast/templates/postgres/driver_repo.py @@ -18,7 +18,7 @@ driver_stats_fv = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver_id"], ttl=timedelta(weeks=52), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py index 873e2b163e..ecccb9863b 100644 --- a/sdk/python/feast/templates/snowflake/driver_repo.py +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -43,7 +43,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=[driver], + entities=["driver"], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py index 41f86ef3c1..58f3df740f 100644 --- a/sdk/python/feast/templates/spark/example.py +++ b/sdk/python/feast/templates/spark/example.py @@ -5,7 +5,7 @@ from datetime import timedelta from pathlib import Path -from feast import Entity, FeatureView, Field +from feast import Entity, FeatureView, Field, ValueType from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) @@ -16,8 +16,10 @@ # Entity definitions -driver = Entity(name="driver", description="driver id",) -customer = Entity(name="customer", description="customer id",) +driver = Entity(name="driver", value_type=ValueType.INT64, description="driver id",) +customer = Entity( + name="customer", value_type=ValueType.INT64, description="customer id", +) # Sources driver_hourly_stats = SparkSource( @@ -38,7 +40,7 @@ # Feature Views driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver"], ttl=timedelta(days=7), schema=[ Field(name="conv_rate", dtype=Float32), @@ -51,7 +53,7 @@ ) customer_daily_profile_view = FeatureView( name="customer_daily_profile", - entities=[customer], + entities=["customer"], ttl=timedelta(days=7), schema=[ Field(name="current_balance", dtype=Float32), diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 186c39b9ef..e08597b67b 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -4,11 +4,11 @@ import pandas as pd from pytz import timezone, utc -from feast.types import FeastType, Float32, Int32, Int64, String +from feast.value_type import ValueType def create_dataset( - entity_type: FeastType = Int32, + entity_type: ValueType = ValueType.INT32, feature_dtype: str = None, feature_is_list: bool = False, list_has_empty_list: bool = False, @@ -16,7 +16,7 @@ def create_dataset( now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) ts = pd.Timestamp(now).round("ms") data = { - "driver_id": get_entities_for_feast_type(entity_type), + "driver_id": get_entities_for_value_type(entity_type), "value": get_feature_values_for_dtype( feature_dtype, feature_is_list, list_has_empty_list ), @@ -37,14 +37,14 @@ def create_dataset( return pd.DataFrame.from_dict(data) -def get_entities_for_feast_type(feast_type: FeastType) -> List: - feast_type_map: Dict[FeastType, List] = { - Int32: [1, 2, 1, 3, 3], - Int64: [1, 2, 1, 3, 3], - Float32: [1.0, 2.0, 1.0, 3.0, 3.0], - String: ["1", "2", "1", "3", "3"], +def get_entities_for_value_type(value_type: ValueType) -> List: + value_type_map: Dict[ValueType, List] = { + ValueType.INT32: [1, 2, 1, 3, 3], + ValueType.INT64: [1, 2, 1, 3, 3], + ValueType.FLOAT: [1.0, 2.0, 1.0, 3.0, 3.0], + ValueType.STRING: ["1", "2", "1", "3", "3"], } - return feast_type_map[feast_type] + return value_type_map[value_type] def get_feature_values_for_dtype( diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 31f181ad53..65d5f3da28 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -11,13 +11,15 @@ def setup_feature_store(): """Prepares the local environment for a FeatureStore docstring test.""" from datetime import datetime, timedelta - from feast import Entity, FeatureStore, FeatureView, Field, FileSource + from feast import Entity, FeatureStore, FeatureView, Field, FileSource, ValueType from feast.repo_operations import init_repo from feast.types import Float32, Int64 init_repo("feature_repo", "local") fs = FeatureStore(repo_path="feature_repo") - driver = Entity(name="driver_id", description="driver id",) + driver = Entity( + name="driver_id", value_type=ValueType.INT64, description="driver id", + ) driver_hourly_stats = FileSource( path="feature_repo/data/driver_stats.parquet", timestamp_field="event_timestamp", @@ -25,7 +27,7 @@ def setup_feature_store(): ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver_id"], ttl=timedelta(seconds=86400 * 1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index 8d6d96d9ef..d8b6d7c89b 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -1,6 +1,14 @@ from datetime import timedelta -from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field, PushSource +from feast import ( + BigQuerySource, + Entity, + FeatureService, + FeatureView, + Field, + PushSource, + ValueType, +) from feast.types import Float32, Int64, String driver_locations_source = BigQuerySource( @@ -38,24 +46,22 @@ driver = Entity( name="driver", # The name is derived from this argument, not object name. join_keys=["driver_id"], + value_type=ValueType.INT64, description="driver id", ) customer = Entity( name="customer", # The name is derived from this argument, not object name. join_keys=["customer_id"], + value_type=ValueType.STRING, ) driver_locations = FeatureView( name="driver_locations", - entities=[driver], + entities=["driver"], ttl=timedelta(days=1), - schema=[ - Field(name="lat", dtype=Float32), - Field(name="lon", dtype=String), - Field(name="driver_id", dtype=Int64), - ], + schema=[Field(name="lat", dtype=Float32), Field(name="lon", dtype=String)], online=True, batch_source=driver_locations_source, tags={}, @@ -63,12 +69,11 @@ pushed_driver_locations = FeatureView( name="pushed_driver_locations", - entities=[driver], + entities=["driver"], ttl=timedelta(days=1), schema=[ Field(name="driver_lat", dtype=Float32), Field(name="driver_long", dtype=String), - Field(name="driver_id", dtype=Int64), ], online=True, stream_source=driver_locations_push_source, @@ -77,13 +82,12 @@ customer_profile = FeatureView( name="customer_profile", - entities=[customer], + entities=["customer"], ttl=timedelta(days=1), schema=[ Field(name="avg_orders_day", dtype=Float32), Field(name="name", dtype=String), Field(name="age", dtype=Int64), - Field(name="customer_id", dtype=String), ], online=True, batch_source=customer_profile_source, @@ -92,13 +96,9 @@ customer_driver_combined = FeatureView( name="customer_driver_combined", - entities=[customer, driver], + entities=["customer", "driver"], ttl=timedelta(days=1), - schema=[ - Field(name="trips", dtype=Int64), - Field(name="driver_id", dtype=Int64), - Field(name="customer_id", dtype=String), - ], + schema=[Field(name="trips", dtype=Int64)], online=True, batch_source=customer_driver_combined_source, tags={}, diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index 75e99aa4b9..1ca7cc3805 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource +from feast import Entity, FeatureView, Field, FileSource, ValueType from feast.types import Float32, Int32, Int64 driver_hourly_stats = FileSource( @@ -9,18 +9,17 @@ created_timestamp_column="created", ) -driver = Entity(name="driver_id", description="driver id",) +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver_id"], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), - Field(name="driver_id", dtype=Int32), ], online=True, batch_source=driver_hourly_stats, diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py index 4b079999ed..cbcc3ad172 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py @@ -1,16 +1,14 @@ from datetime import timedelta -from feast import Entity, FeatureView, FileSource +from feast import FeatureView, FileSource driver_hourly_stats = FileSource( path="driver_stats.parquet", # this parquet is not real and will not be read ) -driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) - driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=[driver], + entities=["driver_id"], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), @@ -19,7 +17,7 @@ driver_hourly_stats_view_dup1 = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=[driver], + entities=["driver_id"], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index 4bc0923e19..ba18cf84ba 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource +from feast import Entity, FeatureView, Field, FileSource, ValueType from feast.types import Float32, Int64 driver_hourly_stats = FileSource( @@ -11,12 +11,17 @@ # The join key here is deliberately different from the parquet file to test the failure path. -driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) +driver = Entity( + name="driver_id", + value_type=ValueType.INT64, + description="driver id", + join_keys=["driver"], +) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver_id"], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index 53e4a32a82..12c1eb8628 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -19,7 +19,7 @@ import pytest -from feast import Entity, RepoConfig +from feast import Entity, RepoConfig, ValueType from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig @@ -60,6 +60,7 @@ def test_usage_on(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", + value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) @@ -98,6 +99,7 @@ def test_usage_off(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", + value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) test_feature_store.apply([entity]) diff --git a/sdk/python/tests/integration/feature_repos/universal/entities.py b/sdk/python/tests/integration/feature_repos/universal/entities.py index 66989b0646..b7a7583f1b 100644 --- a/sdk/python/tests/integration/feature_repos/universal/entities.py +++ b/sdk/python/tests/integration/feature_repos/universal/entities.py @@ -1,21 +1,22 @@ -from feast import Entity +from feast import Entity, ValueType -def driver(): +def driver(value_type: ValueType = ValueType.INT64): return Entity( name="driver", # The name is derived from this argument, not object name. + value_type=value_type, description="driver id", join_keys=["driver_id"], ) def customer(): - return Entity(name="customer_id") + return Entity(name="customer_id", value_type=ValueType.INT64) def location(): - return Entity(name="location_id") + return Entity(name="location_id", value_type=ValueType.INT64) def item(): - return Entity(name="item_id") + return Entity(name="item_id", value_type=ValueType.INT64) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index b93ad987fa..26c2513995 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -14,13 +14,8 @@ ValueType, ) from feast.data_source import DataSource, RequestSource -from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 -from tests.integration.feature_repos.universal.entities import ( - customer, - driver, - item, - location, -) +from feast.types import Array, FeastType, Float32, Float64, Int32 +from tests.integration.feature_repos.universal.entities import location def driver_feature_view( @@ -28,14 +23,11 @@ def driver_feature_view( name="test_correctness", infer_features: bool = False, dtype: FeastType = Float32, - entity_type: FeastType = Int64, ) -> FeatureView: - d = driver() return FeatureView( name=name, - entities=[d], - schema=[Field(name=d.join_key, dtype=entity_type)] - + ([] if infer_features else [Field(name="value", dtype=dtype)]), + entities=["driver"], + schema=None if infer_features else [Field(name="value", dtype=dtype)], ttl=timedelta(days=5), source=data_source, ) @@ -146,7 +138,7 @@ def create_similarity_request_source(): def create_item_embeddings_feature_view(source, infer_features: bool = False): item_embeddings_feature_view = FeatureView( name="item_embeddings", - entities=[item()], + entities=["item"], schema=None if infer_features else [ @@ -164,7 +156,7 @@ def create_item_embeddings_batch_feature_view( ) -> BatchFeatureView: item_embeddings_feature_view = BatchFeatureView( name="item_embeddings", - entities=[item()], + entities=["item"], schema=None if infer_features else [ @@ -178,19 +170,15 @@ def create_item_embeddings_batch_feature_view( def create_driver_hourly_stats_feature_view(source, infer_features: bool = False): - # TODO(felixwang9817): Figure out why not adding an entity field here - # breaks type tests. - d = driver() driver_stats_feature_view = FeatureView( name="driver_stats", - entities=[d], + entities=["driver"], schema=None if infer_features else [ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int32), - Field(name=d.join_key, dtype=Int64), ], source=source, ttl=timedelta(hours=2), @@ -203,7 +191,7 @@ def create_driver_hourly_stats_batch_feature_view( ) -> BatchFeatureView: driver_stats_feature_view = BatchFeatureView( name="driver_stats", - entities=[driver()], + entities=["driver"], schema=None if infer_features else [ @@ -220,7 +208,7 @@ def create_driver_hourly_stats_batch_feature_view( def create_customer_daily_profile_feature_view(source, infer_features: bool = False): customer_profile_feature_view = FeatureView( name="customer_profile", - entities=[customer()], + entities=["customer_id"], schema=None if infer_features else [ @@ -254,13 +242,10 @@ def create_global_stats_feature_view(source, infer_features: bool = False): def create_order_feature_view(source, infer_features: bool = False): return FeatureView( name="order", - entities=[customer(), driver()], + entities=["driver", "customer_id"], schema=None if infer_features - else [ - Field(name="order_is_success", dtype=Int32), - Field(name="driver_id", dtype=Int64), - ], + else [Field(name="order_is_success", dtype=Int32)], source=source, ttl=timedelta(days=2), ) @@ -270,12 +255,7 @@ def create_location_stats_feature_view(source, infer_features: bool = False): location_stats_feature_view = FeatureView( name="location_stats", entities=[location()], - schema=None - if infer_features - else [ - Field(name="temperature", dtype=Int32), - Field(name="location_id", dtype=Int64), - ], + schema=None if infer_features else [Field(name="temperature", dtype=Int32)], source=source, ttl=timedelta(days=2), ) @@ -299,11 +279,9 @@ def create_pushable_feature_view(batch_source: DataSource): ) return FeatureView( name="pushable_location_stats", - entities=[location()], - schema=[ - Field(name="temperature", dtype=Int32), - Field(name="location_id", dtype=Int64), - ], + entities=["location_id"], + # Test that Features still work for FeatureViews. + features=[Feature(name="temperature", dtype=ValueType.INT32)], ttl=timedelta(days=2), source=push_source, ) diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index 8e6be052c8..6dda2e63a9 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -26,11 +26,7 @@ UniversalDatasets, construct_universal_feature_views, ) -from tests.integration.feature_repos.universal.entities import ( - customer, - driver, - location, -) +from tests.integration.feature_repos.universal.entities import driver from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 @@ -43,7 +39,7 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa (_, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - store.apply([customer(), driver(), location(), *feature_views.values()]) + store.apply([driver(), *feature_views.values()]) logs_df = prepare_logs(datasets) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index c99d19b096..b62f7cda24 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -22,6 +22,7 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) from feast.types import Int32 +from feast.value_type import ValueType from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, table_name_from_data_source, @@ -688,10 +689,10 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm created_timestamp_column="created", ) - driver = Entity(name="driver", join_keys=["driver_id"]) + driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64) driver_fv = FeatureView( name="driver_stats", - entities=[driver], + entities=["driver"], schema=[Field(name="avg_daily_trips", dtype=Int32)], batch_source=driver_stats_data_source, ttl=None, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 2af5fe543d..f4440dbfbc 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -13,7 +13,7 @@ import requests from botocore.exceptions import BotoCoreError -from feast import Entity, FeatureService, FeatureView, Field +from feast import Entity, FeatureService, FeatureView, Field, ValueType from feast.errors import ( FeatureNameCollisionError, RequestDataNotFoundInEntityRowsException, @@ -115,13 +115,13 @@ def test_write_to_online_store_event_check(local_redis_environment): } dataframe_source = pd.DataFrame(data) with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id") + e = Entity(name="id", value_type=ValueType.STRING) # Create Feature View fv1 = FeatureView( name="feature_view_123", schema=[Field(name="string_col", dtype=String)], - entities=[e], + entities=["id"], batch_source=file_source, ttl=timedelta(minutes=5), ) diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index 88a4b9f249..db4c6700ce 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -27,8 +27,10 @@ from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.protos.feast.types import Value_pb2 as ValueProto from feast.repo_config import RepoConfig from feast.types import Array, Bytes, Float64, Int64, String +from feast.value_type import ValueType from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -91,7 +93,10 @@ def feature_store_with_s3_registry(): ) def test_apply_entity_success(test_feature_store): entity = Entity( - name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + tags={"team": "matchmaking"}, ) # Register Entity @@ -103,6 +108,7 @@ def test_apply_entity_success(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -121,7 +127,10 @@ def test_apply_entity_success(test_feature_store): ) def test_apply_entity_integration(test_feature_store): entity = Entity( - name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + tags={"team": "matchmaking"}, ) # Register Entity @@ -133,6 +142,7 @@ def test_apply_entity_integration(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -141,6 +151,7 @@ def test_apply_entity_integration(test_feature_store): entity = test_feature_store.get_entity("driver_car_id") assert ( entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -162,8 +173,6 @@ def test_apply_feature_view_success(test_feature_store): date_partition_column="date_partition_col", ) - entity = Entity(name="fs1_my_entity_1", join_keys=["entity_id"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -171,16 +180,15 @@ def test_apply_feature_view_success(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), - Field(name="entity_id", dtype=Int64), ], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([entity, fv1]) + test_feature_store.apply([fv1]) feature_views = test_feature_store.list_feature_views() @@ -209,11 +217,13 @@ def test_apply_feature_view_success(test_feature_store): @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_feature_view_inference_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - entity = Entity(name="id", join_keys=["id_join_key"]) + entity = Entity( + name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 + ) fv1 = FeatureView( name="fv1", - entities=[entity], + entities=["id"], ttl=timedelta(minutes=5), online=True, batch_source=file_source, @@ -222,7 +232,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv2 = FeatureView( name="fv2", - entities=[entity], + entities=["id"], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_table_arg(dataframe_source, "ts_1"), @@ -231,7 +241,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv3 = FeatureView( name="fv3", - entities=[entity], + entities=["id"], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_query_arg(dataframe_source, "ts_1"), @@ -286,8 +296,6 @@ def test_apply_feature_view_integration(test_feature_store): date_partition_column="date_partition_col", ) - entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -295,16 +303,15 @@ def test_apply_feature_view_integration(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), - Field(name="test", dtype=Int64), ], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([fv1, entity]) + test_feature_store.apply([fv1]) feature_views = test_feature_store.list_feature_views() @@ -357,9 +364,13 @@ def test_apply_object_and_read(test_feature_store): created_timestamp_column="timestamp", ) - e1 = Entity(name="fs1_my_entity_1", description="something") + e1 = Entity( + name="fs1_my_entity_1", value_type=ValueType.STRING, description="something" + ) - e2 = Entity(name="fs1_my_entity_2", description="something") + e2 = Entity( + name="fs1_my_entity_2", value_type=ValueType.STRING, description="something" + ) fv1 = FeatureView( name="my_feature_view_1", @@ -368,9 +379,8 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), - Field(name="fs1_my_entity_1", dtype=Int64), ], - entities=[e1], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -383,9 +393,8 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), - Field(name="fs1_my_entity_2", dtype=Int64), ], - entities=[e2], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -397,6 +406,7 @@ def test_apply_object_and_read(test_feature_store): fv1_actual = test_feature_store.get_feature_view("my_feature_view_1") e1_actual = test_feature_store.get_entity("fs1_my_entity_1") + assert fv1 == fv1_actual assert e1 == e1_actual assert fv2 != fv1_actual assert e2 != e1_actual @@ -424,13 +434,13 @@ def test_apply_remote_repo(): def test_reapply_feature_view_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", join_keys=["id_join_key"]) + e = Entity(name="id", join_keys=["id_join_key"], value_type=ValueType.STRING) # Create Feature View fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="string_col", dtype=String)], - entities=[e], + entities=["id"], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -460,7 +470,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="int64_col", dtype=Int64)], - entities=[e], + entities=["id"], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -475,12 +485,10 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): def test_apply_conflicting_featureview_names(feature_store_with_local_registry): """Test applying feature views with non-case-insensitively unique names""" - driver = Entity(name="driver", join_keys=["driver_id"]) - customer = Entity(name="customer", join_keys=["customer_id"]) driver_stats = FeatureView( name="driver_hourly_stats", - entities=[driver], + entities=["driver_id"], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="driver_stats.parquet"), @@ -489,7 +497,7 @@ def test_apply_conflicting_featureview_names(feature_store_with_local_registry): customer_stats = FeatureView( name="DRIVER_HOURLY_STATS", - entities=[customer], + entities=["id"], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="customer_stats.parquet"), diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index aab79af709..8b719eb733 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -23,7 +23,8 @@ from feast.field import Field from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_feature_views_with_inferred_features_and_entities, + update_entities_with_inferred_types_from_feature_views, + update_feature_views_with_inferred_features, ) from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, @@ -37,6 +38,47 @@ ) +def test_update_entities_with_inferred_types_from_feature_views( + simple_dataset_1, simple_dataset_2 +): + with prep_file_source( + df=simple_dataset_1, timestamp_field="ts_1" + ) as file_source, prep_file_source( + df=simple_dataset_2, timestamp_field="ts_1" + ) as file_source_2: + + fv1 = FeatureView( + name="fv1", entities=["id"], batch_source=file_source, ttl=None, + ) + fv2 = FeatureView( + name="fv2", entities=["id"], batch_source=file_source_2, ttl=None, + ) + + actual_1 = Entity(name="id", join_keys=["id_join_key"]) + actual_2 = Entity(name="id", join_keys=["id_join_key"]) + + update_entities_with_inferred_types_from_feature_views( + [actual_1], [fv1], RepoConfig(provider="local", project="test") + ) + update_entities_with_inferred_types_from_feature_views( + [actual_2], [fv2], RepoConfig(provider="local", project="test") + ) + assert actual_1 == Entity( + name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 + ) + assert actual_2 == Entity( + name="id", join_keys=["id_join_key"], value_type=ValueType.STRING + ) + + with pytest.raises(RegistryInferenceFailure): + # two viable data types + update_entities_with_inferred_types_from_feature_views( + [Entity(name="id", join_keys=["id_join_key"])], + [fv1, fv2], + RepoConfig(provider="local", project="test"), + ) + + def test_infer_datasource_names_file(): file_path = "path/to/test.csv" data_source = FileSource(path=file_path) @@ -245,7 +287,7 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: test_view_with_missing_feature.infer_features() -def test_update_feature_views_with_inferred_features_and_entities(): +def test_update_feature_views_with_inferred_features(): file_source = FileSource(name="test", path="test path") entity1 = Entity(name="test1", join_keys=["test_column_1"]) entity2 = Entity(name="test2", join_keys=["test_column_2"]) @@ -270,26 +312,23 @@ def test_update_feature_views_with_inferred_features_and_entities(): ) assert len(feature_view_1.schema) == 2 - assert len(feature_view_1.features) == 1 + assert len(feature_view_1.features) == 2 # The entity field should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features_and_entities( + update_feature_views_with_inferred_features( [feature_view_1], [entity1], RepoConfig(provider="local", project="test") ) - assert len(feature_view_1.schema) == 2 + assert len(feature_view_1.schema) == 1 assert len(feature_view_1.features) == 1 assert len(feature_view_2.schema) == 3 - assert len(feature_view_2.features) == 1 + assert len(feature_view_2.features) == 3 # The entity fields should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features_and_entities( + update_feature_views_with_inferred_features( [feature_view_2], [entity1, entity2], RepoConfig(provider="local", project="test"), ) - assert len(feature_view_2.schema) == 3 + assert len(feature_view_2.schema) == 1 assert len(feature_view_2.features) == 1 - - -# TODO(felixwang9817): Add tests that interact with field mapping. diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index dde31ac9e0..f011d73d2d 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -26,6 +26,7 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.on_demand_feature_view import RequestSource, on_demand_feature_view +from feast.protos.feast.types import Value_pb2 as ValueProto from feast.registry import Registry from feast.repo_config import RegistryConfig from feast.types import Array, Bytes, Float32, Int32, Int64, String @@ -72,7 +73,10 @@ def s3_registry() -> Registry: ) def test_apply_entity_success(test_registry): entity = Entity( - name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + tags={"team": "matchmaking"}, ) project = "project" @@ -86,6 +90,7 @@ def test_apply_entity_success(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -94,6 +99,7 @@ def test_apply_entity_success(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -116,7 +122,10 @@ def test_apply_entity_success(test_registry): ) def test_apply_entity_integration(test_registry): entity = Entity( - name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + tags={"team": "matchmaking"}, ) project = "project" @@ -130,6 +139,7 @@ def test_apply_entity_integration(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -138,6 +148,7 @@ def test_apply_entity_integration(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -162,8 +173,6 @@ def test_apply_feature_view_success(test_registry): created_timestamp_column="timestamp", ) - entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -172,7 +181,7 @@ def test_apply_feature_view_success(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -244,12 +253,10 @@ def test_modify_feature_views_success(test_registry, request_source_schema): request_source = RequestSource(name="request_source", schema=request_source_schema,) - entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="fs1_my_feature_1", dtype=Int64)], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -362,8 +369,6 @@ def test_apply_feature_view_integration(test_registry): created_timestamp_column="timestamp", ) - entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -372,7 +377,7 @@ def test_apply_feature_view_integration(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -439,8 +444,6 @@ def test_apply_data_source(test_registry: Registry): created_timestamp_column="timestamp", ) - entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) - fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -449,7 +452,7 @@ def test_apply_data_source(test_registry: Registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=[entity], + entities=["fs1_my_entity_1"], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -496,7 +499,10 @@ def test_commit(): test_registry = Registry(registry_config, None) entity = Entity( - name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + tags={"team": "matchmaking"}, ) project = "project" @@ -511,6 +517,7 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -519,6 +526,7 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project, allow_cache=True) assert ( entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -544,6 +552,7 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -552,6 +561,7 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" + and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index cfb0393d2f..81fa0200fd 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -9,17 +9,8 @@ import pytest from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.types import ( - Array, - Bool, - FeastType, - Float32, - Float64, - Int32, - Int64, - String, - UnixTimestamp, -) +from feast.types import Array, Bool, Float32, Int32, Int64, UnixTimestamp +from feast.value_type import ValueType from tests.data.data_creator import create_dataset from tests.integration.feature_repos.repo_configuration import ( FULL_REPO_CONFIGS, @@ -35,11 +26,11 @@ def populate_test_configs(offline: bool): entity_type_feature_dtypes = [ - (Int32, "int32"), - (Int64, "int64"), - (String, "float"), - (String, "bool"), - (Int32, "datetime"), + (ValueType.INT32, "int32"), + (ValueType.INT64, "int64"), + (ValueType.STRING, "float"), + (ValueType.STRING, "bool"), + (ValueType.INT32, "datetime"), ] configs: List[TypeTestConfig] = [] for test_repo_config in FULL_REPO_CONFIGS: @@ -69,7 +60,7 @@ def populate_test_configs(offline: bool): @dataclass(frozen=True, repr=True) class TypeTestConfig: - entity_type: FeastType + entity_type: ValueType feature_dtype: str feature_is_list: bool has_empty_list: bool @@ -126,7 +117,6 @@ def get_fixtures(request): config.feature_is_list, config.has_empty_list, data_source, - config.entity_type, ) def cleanup(): @@ -148,29 +138,20 @@ def test_entity_inference_types_match(offline_types_test_fixtures): environment, config, data_source, fv = offline_types_test_fixtures fs = environment.feature_store - # TODO(felixwang9817): Refactor this by finding a better way to force type inference. - # Override the schema and entity_columns to force entity inference. - entity = driver() - fv.schema = list(filter(lambda x: x.name != entity.join_key, fv.schema)) - fv.entity_columns = [] + # Don't specify value type in entity to force inference + entity = driver(value_type=ValueType.UNKNOWN) fs.apply([fv, entity]) entities = fs.list_entities() entity_type_to_expected_inferred_entity_type = { - Int32: Int64, - Int64: Int64, - Float32: Float64, - String: String, + ValueType.INT32: ValueType.INT64, + ValueType.INT64: ValueType.INT64, + ValueType.FLOAT: ValueType.DOUBLE, + ValueType.STRING: ValueType.STRING, } - for entity in entities: - entity_columns = list( - filter(lambda x: x.name == entity.join_key, fv.entity_columns) - ) - assert len(entity_columns) == 1 - entity_column = entity_columns[0] assert ( - entity_column.dtype + entity.value_type == entity_type_to_expected_inferred_entity_type[config.entity_type] ) @@ -192,12 +173,13 @@ def test_feature_get_historical_features_types_match(offline_types_test_fixtures config.feature_is_list, config.has_empty_list, data_source, - config.entity_type, ) fs.apply([fv, entity]) entity_df = pd.DataFrame() - entity_df["driver_id"] = ["1", "3"] if config.entity_type == String else [1, 3] + entity_df["driver_id"] = ( + ["1", "3"] if config.entity_type == ValueType.STRING else [1, 3] + ) ts = pd.Timestamp(datetime.utcnow()).round("ms") entity_df["ts"] = [ ts - timedelta(hours=4), @@ -240,11 +222,10 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): config.feature_is_list, config.has_empty_list, data_source, - config.entity_type, ) fs = environment.feature_store features = [fv.name + ":value"] - entity = driver() + entity = driver(value_type=config.entity_type) fs.apply([fv, entity]) fs.materialize( environment.start_date, @@ -253,7 +234,7 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): # we can successfully infer type even from all empty values ) - driver_id_value = "1" if config.entity_type == String else 1 + driver_id_value = "1" if config.entity_type == ValueType.STRING else 1 online_features = fs.get_online_features( features=features, entity_rows=[{"driver_id": driver_id_value}], ).to_dict() @@ -286,7 +267,7 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): def create_feature_view( - name, feature_dtype, feature_is_list, has_empty_list, data_source, entity_type + name, feature_dtype, feature_is_list, has_empty_list, data_source ): if feature_is_list is True: if feature_dtype == "int32": @@ -311,9 +292,7 @@ def create_feature_view( elif feature_dtype == "datetime": dtype = UnixTimestamp - return driver_feature_view( - data_source, name=name, dtype=dtype, entity_type=entity_type - ) + return driver_feature_view(data_source, name=name, dtype=dtype,) def assert_expected_historical_feature_types( diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index e5a7206b96..3ab9bf196f 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -2,7 +2,7 @@ import pytest -from feast import BigQuerySource, Entity, FeatureView, Field +from feast import BigQuerySource, FeatureView, Field from feast.types import Float32, String from tests.utils.cli_utils import CliRunner, get_example_repo from tests.utils.online_read_write_test import basic_rw_test @@ -19,7 +19,6 @@ def test_partial() -> None: with runner.local_repo( get_example_repo("example_feature_repo_1.py"), "bigquery" ) as store: - driver = Entity(name="driver", join_keys=["test"]) driver_locations_source = BigQuerySource( table="feast-oss.public.drivers", @@ -29,13 +28,12 @@ def test_partial() -> None: driver_locations_100 = FeatureView( name="driver_locations_100", - entities=[driver], + entities=["driver"], ttl=timedelta(days=1), schema=[ Field(name="lat", dtype=Float32), Field(name="lon", dtype=String), Field(name="name", dtype=String), - Field(name="test", dtype=String), ], online=True, batch_source=driver_locations_source, diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index ae10c834c8..483dae73e2 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -2,36 +2,34 @@ diff_registry_objects, tag_objects_for_keep_delete_update_add, ) -from feast.entity import Entity from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: - entity = Entity(name="id", join_keys=["id"]) to_delete = FeatureView( - name="to_delete", entities=[entity], batch_source=file_source, ttl=None, + name="to_delete", entities=["id"], batch_source=file_source, ttl=None, ) unchanged_fv = FeatureView( - name="fv1", entities=[entity], batch_source=file_source, ttl=None, + name="fv1", entities=["id"], batch_source=file_source, ttl=None, ) pre_changed = FeatureView( name="fv2", - entities=[entity], + entities=["id"], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=[entity], + entities=["id"], batch_source=file_source, ttl=None, tags={"when": "after"}, ) to_add = FeatureView( - name="to_add", entities=[entity], batch_source=file_source, ttl=None, + name="to_add", entities=["id"], batch_source=file_source, ttl=None, ) keep, delete, update, add = tag_objects_for_keep_delete_update_add( @@ -54,17 +52,16 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): def test_diff_registry_objects_feature_views(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: - entity = Entity(name="id", join_keys=["id"]) pre_changed = FeatureView( name="fv2", - entities=[entity], + entities=["id"], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=[entity], + entities=["id"], batch_source=file_source, ttl=None, tags={"when": "after"}, diff --git a/sdk/python/tests/unit/infra/test_provider.py b/sdk/python/tests/unit/infra/test_provider.py index 5ed5603b03..43c09760e9 100644 --- a/sdk/python/tests/unit/infra/test_provider.py +++ b/sdk/python/tests/unit/infra/test_provider.py @@ -20,13 +20,14 @@ from feast.field import Field from feast.infra.provider import _get_column_names from feast.types import String +from feast.value_type import ValueType def test_get_column_names_preserves_feature_ordering(): - entity = Entity("my-entity", description="My entity") + entity = Entity("my-entity", description="My entity", value_type=ValueType.STRING) fv = FeatureView( name="my-fv", - entities=[entity], + entities=["my-entity"], ttl=timedelta(days=1), batch_source=BigQuerySource(table="non-existent-mock"), schema=[ diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 04a857ddef..254a975f67 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -20,14 +20,19 @@ def test_join_key_default(): with pytest.deprecated_call(): - entity = Entity("my-entity", description="My entity") + entity = Entity( + "my-entity", description="My entity", value_type=ValueType.STRING + ) assert entity.join_key == "my-entity" def test_entity_class_contains_tags(): with pytest.deprecated_call(): entity = Entity( - "my-entity", description="My entity", tags={"key1": "val1", "key2": "val2"}, + "my-entity", + description="My entity", + value_type=ValueType.STRING, + tags={"key1": "val1", "key2": "val2"}, ) assert "key1" in entity.tags.keys() and entity.tags["key1"] == "val1" assert "key2" in entity.tags.keys() and entity.tags["key2"] == "val2" @@ -35,18 +40,20 @@ def test_entity_class_contains_tags(): def test_entity_without_tags_empty_dict(): with pytest.deprecated_call(): - entity = Entity("my-entity", description="My entity") + entity = Entity( + "my-entity", description="My entity", value_type=ValueType.STRING + ) assert entity.tags == dict() assert len(entity.tags) == 0 def test_entity_without_description(): with pytest.deprecated_call(): - Entity("my-entity") + Entity("my-entity", value_type=ValueType.STRING) def test_name_not_specified(): - assertpy.assert_that(lambda: Entity()).raises(ValueError) + assertpy.assert_that(lambda: Entity(value_type=ValueType.STRING)).raises(ValueError) def test_multiple_args(): @@ -54,19 +61,15 @@ def test_multiple_args(): def test_name_keyword(recwarn): - Entity(name="my-entity") + Entity(name="my-entity", value_type=ValueType.STRING) assert len(recwarn) == 0 - Entity(name="my-entity", join_key="test") - assert len(recwarn) == 1 - Entity(name="my-entity", join_keys=["test"]) - assert len(recwarn) == 1 def test_hash(): - entity1 = Entity(name="my-entity") - entity2 = Entity(name="my-entity") - entity3 = Entity(name="my-entity", join_keys=["not-my-entity"]) - entity4 = Entity(name="my-entity", join_keys=["not-my-entity"], description="test") + entity1 = Entity(name="my-entity", value_type=ValueType.STRING) + entity2 = Entity(name="my-entity", value_type=ValueType.STRING) + entity3 = Entity(name="my-entity", value_type=ValueType.FLOAT) + entity4 = Entity(name="my-entity", value_type=ValueType.FLOAT, description="test") s1 = {entity1, entity2} assert len(s1) == 1 diff --git a/sdk/python/tests/unit/test_feature_view.py b/sdk/python/tests/unit/test_feature_view.py index 1ef36081ec..80a583806e 100644 --- a/sdk/python/tests/unit/test_feature_view.py +++ b/sdk/python/tests/unit/test_feature_view.py @@ -62,7 +62,3 @@ def test_hash(): s4 = {feature_view_1, feature_view_2, feature_view_3, feature_view_4} assert len(s4) == 3 - - -# TODO(felixwang9817): Add tests for proto conversion. -# TODO(felixwang9817): Add tests for field mapping logic. diff --git a/sdk/python/tests/unit/test_unit_feature_store.py b/sdk/python/tests/unit/test_unit_feature_store.py index 0c13dffa62..6f9dd6acb0 100644 --- a/sdk/python/tests/unit/test_unit_feature_store.py +++ b/sdk/python/tests/unit/test_unit_feature_store.py @@ -17,7 +17,7 @@ class MockFeatureView: projection: MockFeatureViewProjection -def test_get_unique_entities(): +def test__get_unique_entities(): entity_values = { "entity_1": [Value(int64_val=1), Value(int64_val=2), Value(int64_val=1)], "entity_2": [ diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 9f2f8ba60d..82ffc8e98b 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -17,13 +17,13 @@ from feast.infra.provider import _convert_arrow_to_proto from feast.repo_config import RepoConfig from feast.types import Float32, Int32 +from feast.value_type import ValueType def create_driver_hourly_stats_feature_view(source): - driver = Entity(name="driver", join_keys=["driver_id"]) driver_stats_feature_view = FeatureView( name="driver_stats", - entities=[driver], + entities=["driver_id"], schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), @@ -61,7 +61,7 @@ def benchmark_writes(): # This is just to set data source to something, we're not reading from parquet source here. parquet_path = os.path.join(temp_dir, "data.parquet") - driver = Entity(name="driver_id") + driver = Entity(name="driver_id", value_type=ValueType.INT64) table = create_driver_hourly_stats_feature_view( create_driver_hourly_stats_source(parquet_path=parquet_path) )