diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index c91e5a40be..e17a5d9cf8 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -7,14 +7,14 @@ from google.protobuf.duration_pb2 import Duration from feast.field import Field -from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType +from feast import Entity, Feature, BatchFeatureView, FileSource driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", timestamp_field="event_timestamp", created_timestamp_column="created", ) -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats_view = BatchFeatureView( name="driver_hourly_stats", entities=["driver_id"], diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index d019bb72ac..55c19e2a0a 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -72,12 +72,6 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri viewNames[viewName] = nil } - entities, _ := s.fs.ListEntities(true) - entitiesByName := make(map[string]*model.Entity) - for _, entity := range entities { - entitiesByName[entity.Name] = entity - } - joinKeyTypes := make(map[string]int32) for viewName := range viewNames { @@ -86,9 +80,8 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri // skip on demand feature views continue } - for _, entityName := range view.Entities { - entity := entitiesByName[entityName] - joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + for _, entityColumn := range view.EntityColumns { + joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) } } @@ -103,21 +96,14 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN joinKeyTypes := make(map[string]int32) - entities, _ := s.fs.ListEntities(true) - entitiesByName := make(map[string]*model.Entity) - for _, entity := range entities { - entitiesByName[entity.Name] = entity - } - for _, projection := range featureService.Projections { view, err := s.fs.GetFeatureView(projection.Name, true) if err != nil { // skip on demand feature views continue } - for _, entityName := range view.Entities { - entity := entitiesByName[entityName] - joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + for _, entityColumn := range view.EntityColumns { + joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) } } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 4ecd781b74..b0fc987fb4 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -132,7 +132,7 @@ func (fs *FeatureStore) GetOnlineFeatures( if entitylessCase { dummyEntityColumn := &prototypes.RepeatedValue{Val: make([]*prototypes.Value, numRows)} for index := 0; index < numRows; index++ { - dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY + dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY_VALUE } joinKeyToEntityValues[model.DUMMY_ENTITY_ID] = dummyEntityColumn } @@ -272,7 +272,7 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b return nil, err } if fv.HasEntity(model.DUMMY_ENTITY_NAME) && hideDummyEntity { - fv.Entities = []string{} + fv.EntityNames = []string{} } return fv, nil } diff --git a/go/internal/feast/model/basefeatureview.go b/go/internal/feast/model/basefeatureview.go index 28ef7231fd..1bdf614c25 100644 --- a/go/internal/feast/model/basefeatureview.go +++ b/go/internal/feast/model/basefeatureview.go @@ -8,15 +8,15 @@ import ( type BaseFeatureView struct { Name string - Features []*Feature + Features []*Field Projection *FeatureViewProjection } func NewBaseFeatureView(name string, featureProtos []*core.FeatureSpecV2) *BaseFeatureView { base := &BaseFeatureView{Name: name} - features := make([]*Feature, len(featureProtos)) + features := make([]*Field, len(featureProtos)) for index, featureSpecV2 := range featureProtos { - features[index] = NewFeatureFromProto(featureSpecV2) + features[index] = NewFieldFromProto(featureSpecV2) } base.Features = features base.Projection = NewFeatureViewProjectionFromDefinition(base) @@ -43,7 +43,7 @@ func (fv *BaseFeatureView) WithProjection(projection *FeatureViewProjection) (*B } func (fv *BaseFeatureView) ProjectWithFeatures(featureNames []string) *FeatureViewProjection { - features := make([]*Feature, 0) + features := make([]*Field, 0) for _, feature := range fv.Features { for _, allowedFeatureName := range featureNames { if feature.Name == allowedFeatureName { diff --git a/go/internal/feast/model/entity.go b/go/internal/feast/model/entity.go index ac3a5d5f26..5a09edb655 100644 --- a/go/internal/feast/model/entity.go +++ b/go/internal/feast/model/entity.go @@ -2,18 +2,16 @@ package model import ( "github.com/feast-dev/feast/go/protos/feast/core" - "github.com/feast-dev/feast/go/protos/feast/types" ) type Entity struct { - Name string - ValueType types.ValueType_Enum - JoinKey string + Name string + JoinKey string } func NewEntityFromProto(proto *core.Entity) *Entity { - return &Entity{Name: proto.Spec.Name, - ValueType: proto.Spec.ValueType, - JoinKey: proto.Spec.JoinKey, + return &Entity{ + Name: proto.Spec.Name, + JoinKey: proto.Spec.JoinKey, } } diff --git a/go/internal/feast/model/featureview.go b/go/internal/feast/model/featureview.go index 6c198f9994..ceb3736f99 100644 --- a/go/internal/feast/model/featureview.go +++ b/go/internal/feast/model/featureview.go @@ -13,12 +13,13 @@ const ( DUMMY_ENTITY_VAL = "" ) -var DUMMY_ENTITY types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} +var DUMMY_ENTITY_VALUE types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} type FeatureView struct { - Base *BaseFeatureView - Ttl *durationpb.Duration - Entities []string + Base *BaseFeatureView + Ttl *durationpb.Duration + EntityNames []string + EntityColumns []*Field } func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { @@ -26,25 +27,30 @@ func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { Ttl: &(*proto.Spec.Ttl), } if len(proto.Spec.Entities) == 0 { - featureView.Entities = []string{DUMMY_ENTITY_NAME} + featureView.EntityNames = []string{DUMMY_ENTITY_NAME} } else { - featureView.Entities = proto.Spec.Entities + featureView.EntityNames = proto.Spec.Entities } + entityColumns := make([]*Field, len(proto.Spec.EntityColumns)) + for i, entityColumn := range proto.Spec.EntityColumns { + entityColumns[i] = NewFieldFromProto(entityColumn) + } + featureView.EntityColumns = entityColumns return featureView } -func (fs *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { - ttl := durationpb.Duration{Seconds: fs.Ttl.Seconds, Nanos: fs.Ttl.Nanos} +func (fv *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { + ttl := durationpb.Duration{Seconds: fv.Ttl.Seconds, Nanos: fv.Ttl.Nanos} featureView := &FeatureView{Base: base, - Ttl: &ttl, - Entities: fs.Entities, + Ttl: &ttl, + EntityNames: fv.EntityNames, } return featureView } -func (fs *FeatureView) HasEntity(lookup string) bool { - for _, entityName := range fs.Entities { - if entityName == lookup { +func (fv *FeatureView) HasEntity(name string) bool { + for _, entityName := range fv.EntityNames { + if entityName == name { return true } } diff --git a/go/internal/feast/model/featureviewprojection.go b/go/internal/feast/model/featureviewprojection.go index e80e8844ed..fe54774ff1 100644 --- a/go/internal/feast/model/featureviewprojection.go +++ b/go/internal/feast/model/featureviewprojection.go @@ -7,7 +7,7 @@ import ( type FeatureViewProjection struct { Name string NameAlias string - Features []*Feature + Features []*Field JoinKeyMap map[string]string } @@ -24,9 +24,9 @@ func NewFeatureViewProjectionFromProto(proto *core.FeatureViewProjection) *Featu JoinKeyMap: proto.JoinKeyMap, } - features := make([]*Feature, len(proto.FeatureColumns)) + features := make([]*Field, len(proto.FeatureColumns)) for index, featureSpecV2 := range proto.FeatureColumns { - features[index] = NewFeatureFromProto(featureSpecV2) + features[index] = NewFieldFromProto(featureSpecV2) } featureProjection.Features = features return featureProjection diff --git a/go/internal/feast/model/feature.go b/go/internal/feast/model/field.go similarity index 63% rename from go/internal/feast/model/feature.go rename to go/internal/feast/model/field.go index d833a8901b..4f72d34686 100644 --- a/go/internal/feast/model/feature.go +++ b/go/internal/feast/model/field.go @@ -5,13 +5,14 @@ import ( "github.com/feast-dev/feast/go/protos/feast/types" ) -type Feature struct { +type Field struct { Name string Dtype types.ValueType_Enum } -func NewFeatureFromProto(proto *core.FeatureSpecV2) *Feature { - return &Feature{Name: proto.Name, +func NewFieldFromProto(proto *core.FeatureSpecV2) *Field { + return &Field{ + Name: proto.Name, Dtype: proto.ValueType, } } diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index 1d0567c354..e2a2df923b 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -251,7 +251,7 @@ func GetEntityMaps(requestedFeatureViews []*FeatureViewAndRefs, entities []*mode joinKeyToAliasMap = map[string]string{} } - for _, entityName := range featureView.Entities { + for _, entityName := range featureView.EntityNames { joinKey := entitiesByName[entityName].JoinKey entityNameToJoinKeyMap[entityName] = joinKey @@ -518,7 +518,7 @@ func GroupFeatureRefs(requestedFeatureViews []*FeatureViewAndRefs, joinKeys := make([]string, 0) fv := featuresAndView.View featureNames := featuresAndView.FeatureRefs - for _, entityName := range fv.Entities { + for _, entityName := range fv.EntityNames { joinKeys = append(joinKeys, entityNameToJoinKeyMap[entityName]) } diff --git a/go/internal/feast/onlineserving/serving_test.go b/go/internal/feast/onlineserving/serving_test.go index 0a00f546f9..bd4e45a21e 100644 --- a/go/internal/feast/onlineserving/serving_test.go +++ b/go/internal/feast/onlineserving/serving_test.go @@ -20,19 +20,19 @@ func TestGroupingFeatureRefs(t *testing.T) { NameAlias: "aliasViewA", }, }, - Entities: []string{"driver", "customer"}, + EntityNames: []string{"driver", "customer"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: []string{"driver", "customer"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + EntityNames: []string{"driver", "customer"}, } viewC := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewC"}, - Entities: []string{"driver"}, + Base: &model.BaseFeatureView{Name: "viewC"}, + EntityNames: []string{"driver"}, } viewD := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewD"}, - Entities: []string{"customer"}, + Base: &model.BaseFeatureView{Name: "viewD"}, + EntityNames: []string{"customer"}, } refGroups, _ := GroupFeatureRefs( []*FeatureViewAndRefs{ @@ -105,11 +105,11 @@ func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: []string{"location"}, + EntityNames: []string{"location"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: []string{"location"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + EntityNames: []string{"location"}, } refGroups, _ := GroupFeatureRefs( @@ -164,7 +164,7 @@ func TestGroupingFeatureRefsWithMissingKey(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: []string{"location"}, + EntityNames: []string{"location"}, } _, err := GroupFeatureRefs( diff --git a/go/internal/feast/server/logging/featureserviceschema.go b/go/internal/feast/server/logging/featureserviceschema.go index 5047346c2c..2779982fc0 100644 --- a/go/internal/feast/server/logging/featureserviceschema.go +++ b/go/internal/feast/server/logging/featureserviceschema.go @@ -52,13 +52,12 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* features = append(features, fullFeatureName) allFeatureTypes[fullFeatureName] = f.Dtype } - for _, entityName := range fv.Entities { - entity := entityMap[entityName] + for _, entityColumn := range fv.EntityColumns { var joinKey string - if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { + if joinKeyAlias, ok := featureProjection.JoinKeyMap[entityColumn.Name]; ok { joinKey = joinKeyAlias } else { - joinKey = entity.JoinKey + joinKey = entityColumn.Name } if _, ok := joinKeysSet[joinKey]; !ok { @@ -66,7 +65,7 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* } joinKeysSet[joinKey] = nil - entityJoinKeyToType[joinKey] = entity.ValueType + entityJoinKeyToType[joinKey] = entityColumn.Dtype } } else if odFv, ok := odFvMap[featureViewName]; ok { for _, f := range featureProjection.Features { diff --git a/go/internal/feast/server/logging/featureserviceschema_test.go b/go/internal/feast/server/logging/featureserviceschema_test.go index efcd5ec7fc..6fa1c12e24 100644 --- a/go/internal/feast/server/logging/featureserviceschema_test.go +++ b/go/internal/feast/server/logging/featureserviceschema_test.go @@ -74,9 +74,10 @@ func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) - //Remove entities in featureservice + // Remove entities in featureservice for _, featureView := range fvs { - featureView.Entities = []string{} + featureView.EntityNames = []string{} + featureView.EntityColumns = []*model.Field{} } schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) @@ -126,65 +127,69 @@ func TestSchemaUsesOrderInFeatureService(t *testing.T) { // Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { - f1 := test.CreateNewFeature( + f1 := test.CreateNewField( "int64", types.ValueType_INT64, ) - f2 := test.CreateNewFeature( + f2 := test.CreateNewField( "float32", types.ValueType_FLOAT, ) projection1 := test.CreateNewFeatureViewProjection( "featureView1", "", - []*model.Feature{f1, f2}, + []*model.Field{f1, f2}, map[string]string{}, ) baseFeatureView1 := test.CreateBaseFeatureView( "featureView1", - []*model.Feature{f1, f2}, + []*model.Field{f1, f2}, projection1, ) - featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) - entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") - f3 := test.CreateNewFeature( + entity1 := test.CreateNewEntity("driver_id", "driver_id") + entitycolumn1 := test.CreateNewField( + "driver_id", + types.ValueType_INT64, + ) + featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) + f3 := test.CreateNewField( "int32", types.ValueType_INT32, ) - f4 := test.CreateNewFeature( + f4 := test.CreateNewField( "double", types.ValueType_DOUBLE, ) projection2 := test.CreateNewFeatureViewProjection( "featureView2", "", - []*model.Feature{f3, f4}, + []*model.Field{f3, f4}, map[string]string{}, ) baseFeatureView2 := test.CreateBaseFeatureView( "featureView2", - []*model.Feature{f3, f4}, + []*model.Field{f3, f4}, projection2, ) - featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) + featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) - f5 := test.CreateNewFeature( + f5 := test.CreateNewField( "odfv_f1", types.ValueType_INT32, ) - f6 := test.CreateNewFeature( + f6 := test.CreateNewField( "odfv_f2", types.ValueType_DOUBLE, ) projection3 := test.CreateNewFeatureViewProjection( "od_bf1", "", - []*model.Feature{f5, f6}, + []*model.Field{f5, f6}, map[string]string{}, ) od_bf1 := test.CreateBaseFeatureView( "od_bf1", - []*model.Feature{f5, f6}, + []*model.Field{f5, f6}, projection3, ) odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index 6d236a4319..9b50ae2c36 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -193,7 +193,7 @@ func GetProtoFromRecord(rec arrow.Record) (map[string]*types.RepeatedValue, erro return r, nil } -func CreateBaseFeatureView(name string, features []*model.Feature, projection *model.FeatureViewProjection) *model.BaseFeatureView { +func CreateBaseFeatureView(name string, features []*model.Field, projection *model.FeatureViewProjection) *model.BaseFeatureView { return &model.BaseFeatureView{ Name: name, Features: features, @@ -201,16 +201,15 @@ func CreateBaseFeatureView(name string, features []*model.Feature, projection *m } } -func CreateNewEntity(name string, valueType types.ValueType_Enum, joinKey string) *model.Entity { +func CreateNewEntity(name string, joinKey string) *model.Entity { return &model.Entity{ - Name: name, - ValueType: valueType, - JoinKey: joinKey, + Name: name, + JoinKey: joinKey, } } -func CreateNewFeature(name string, dtype types.ValueType_Enum) *model.Feature { - return &model.Feature{Name: name, +func CreateNewField(name string, dtype types.ValueType_Enum) *model.Field { + return &model.Field{Name: name, Dtype: dtype, } } @@ -225,7 +224,7 @@ func CreateNewFeatureService(name string, project string, createdTimestamp *time } } -func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Feature, joinKeyMap map[string]string) *model.FeatureViewProjection { +func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Field, joinKeyMap map[string]string) *model.FeatureViewProjection { return &model.FeatureViewProjection{Name: name, NameAlias: nameAlias, Features: features, @@ -233,10 +232,11 @@ func CreateNewFeatureViewProjection(name string, nameAlias string, features []*m } } -func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string) *model.FeatureView { +func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string, entityColumns []*model.Field) *model.FeatureView { return &model.FeatureView{ - Base: base, - Ttl: ttl, - Entities: entities, + Base: base, + Ttl: ttl, + EntityNames: entities, + EntityColumns: entityColumns, } } diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index 2662350540..c9e38bf344 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -35,7 +35,7 @@ message FeatureView { FeatureViewMeta meta = 2; } -// Next available id: 12 +// Next available id: 13 // TODO(adchia): refactor common fields from this and ODFV into separate metadata proto message FeatureViewSpec { // Name of the feature view. Must be unique. Not updated. @@ -44,13 +44,15 @@ message FeatureViewSpec { // Name of Feast project that this feature view belongs to. string project = 2; - // List names of entities to associate with the Features defined in this - // Feature View. Not updatable. + // List of names of entities associated with this feature view. repeated string entities = 3; - // List of specifications for each field defined as part of this feature view. + // List of specifications for each feature defined as part of this feature view. repeated FeatureSpecV2 features = 4; + // List of specifications for each entity defined as part of this feature view. + repeated FeatureSpecV2 entity_columns = 12; + // Description of the feature view. string description = 10; diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 3aaf0f9b69..e29ac4f6ef 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -31,7 +31,7 @@ class Entity: Attributes: name: The unique name of the entity. - value_type: The type of the entity, such as string or float. + value_type (deprecated): The type of the entity, such as string or float. join_key: A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities with their associated features. If not specified, defaults to the name. @@ -60,7 +60,7 @@ def __init__( self, *args, name: Optional[str] = None, - value_type: ValueType = ValueType.UNKNOWN, + value_type: Optional[ValueType] = None, description: str = "", join_key: Optional[str] = None, tags: Optional[Dict[str, str]] = None, @@ -72,7 +72,7 @@ def __init__( Args: name: The unique name of the entity. - value_type: The type of the entity, such as string or float. + value_type (deprecated): The type of the entity, such as string or float. description: A human-readable description. join_key (deprecated): A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities @@ -104,8 +104,23 @@ def __init__( if not self.name: raise ValueError("Name needs to be specified") - self.value_type = value_type + if value_type: + warnings.warn( + ( + "The `value_type` parameter is being deprecated. Instead, the type of an entity " + "should be specified as a Field in the schema of a feature view. Feast 0.22 and " + "onwards will not support the `value_type` parameter. The `entities` parameter of " + "feature views should also be changed to a List[Entity] instead of a List[str]; if " + "this is not done, entity columns will be mistakenly interpreted as feature columns." + ), + DeprecationWarning, + ) + self.value_type = value_type or ValueType.UNKNOWN + # For now, both the `join_key` and `join_keys` attributes are set correctly, + # so both are usable. + # TODO(felixwang9817): Remove the usage of `join_key` throughout the codebase + # when the usage of `join_key` as a parameter is removed. if join_key: warnings.warn( ( @@ -125,6 +140,8 @@ def __init__( self.join_key = join_keys[0] else: self.join_key = join_key if join_key else self.name + if not self.join_keys: + self.join_keys = [self.join_key] self.description = description self.tags = tags if tags is not None else {} self.owner = owner @@ -153,6 +170,9 @@ def __eq__(self, other): def __str__(self): return str(MessageToJson(self.to_proto())) + def __lt__(self, other): + return self.name < other.name + def is_valid(self): """ Validates the state of this entity locally. diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index acc965ac44..195a703764 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -14,7 +14,6 @@ from feast.protos.feast.core.FeatureService_pb2 import ( LoggingConfig as LoggingConfigProto, ) -from feast.types import from_value_type if TYPE_CHECKING: from feast import FeatureService @@ -82,14 +81,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] else: - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, self._project) + for entity_column in feature_view.entity_columns: join_key = projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) - fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ - from_value_type(entity.value_type) - ] + fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] # system columns fields[REQUEST_ID_FIELD] = pa.string() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 13c73612f0..7cbb081702 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -64,8 +64,7 @@ ) from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_entities_with_inferred_types_from_feature_views, - update_feature_views_with_inferred_features, + update_feature_views_with_inferred_features_and_entities, ) from feast.infra.infra_object import Infra from feast.infra.provider import Provider, RetrievalJob, get_provider @@ -254,6 +253,7 @@ def _list_feature_views( ): if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: fv.entities = [] + fv.entity_columns = [] feature_views.append(fv) return feature_views @@ -476,11 +476,7 @@ def _make_inferences( views_to_update: List[FeatureView], odfvs_to_update: List[OnDemandFeatureView], ): - """Makes inferences for entities, feature views, and odfvs.""" - update_entities_with_inferred_types_from_feature_views( - entities_to_update, views_to_update, self.config - ) - + """Makes inferences for data sources, feature views, and odfvs.""" update_data_sources_with_inferred_event_timestamp_col( data_sources_to_update, self.config ) @@ -491,7 +487,7 @@ def _make_inferences( # New feature views may reference previously applied entities. entities = self._list_entities() - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( views_to_update, entities + entities_to_update, self.config ) @@ -517,11 +513,11 @@ def _plan( Examples: Generate a plan adding an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig >>> from feast.feature_store import RepoContents >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver = Entity(name="driver_id", description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -529,7 +525,7 @@ def _plan( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=["driver_id"], + ... entities=[driver], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -629,10 +625,10 @@ def apply( Examples: Register an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver = Entity(name="driver_id", description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -640,7 +636,7 @@ def apply( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=["driver_id"], + ... entities=[driver], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -687,6 +683,9 @@ def apply( data_sources_to_update = list(data_sources_set_to_update) + # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. + entities_to_update.append(DUMMY_ENTITY) + # Validate all feature views and make inferences. self._validate_all_feature_views( views_to_update, odfvs_to_update, request_views_to_update @@ -695,9 +694,6 @@ def apply( data_sources_to_update, entities_to_update, views_to_update, odfvs_to_update ) - # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - entities_to_update.append(DUMMY_ENTITY) - # Add all objects to the registry and update the provider's infrastructure. for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) @@ -1541,12 +1537,12 @@ def _get_columnar_entity_values( def _get_entity_maps( self, feature_views ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: + # TODO(felixwang9817): Support entities that have different types for different feature views. entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) entity_name_to_join_key_map: Dict[str, str] = {} entity_type_map: Dict[str, ValueType] = {} for entity in entities: entity_name_to_join_key_map[entity.name] = entity.join_key - entity_type_map[entity.name] = entity.value_type for feature_view in feature_views: for entity_name in feature_view.entities: entity = self._registry.get_entity( @@ -1561,7 +1557,11 @@ def _get_entity_maps( entity.join_key, entity.join_key ) entity_name_to_join_key_map[entity_name] = join_key - entity_type_map[join_key] = entity.value_type + for entity_column in feature_view.entity_columns: + entity_type_map[ + entity_column.name + ] = entity_column.dtype.to_value_type() + return ( entity_name_to_join_key_map, entity_type_map, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 65a4914a8f..fa9cb03305 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -36,7 +36,6 @@ MaterializationInterval as MaterializationIntervalProto, ) from feast.usage import log_exceptions -from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) @@ -44,9 +43,7 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" -DUMMY_ENTITY = Entity( - name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID], value_type=ValueType.STRING, -) +DUMMY_ENTITY = Entity(name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID],) class FeatureView(BaseFeatureView): @@ -55,7 +52,7 @@ class FeatureView(BaseFeatureView): Attributes: name: The unique name of the feature view. - entities: The list of entities with which this group of features is associated. + entities: The list of names of entities that this feature view is associated with. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. @@ -65,9 +62,11 @@ class FeatureView(BaseFeatureView): stream_source (optional): The stream source of data where this group of features is stored. This is deprecated in favor of `source`. schema: The schema of the feature view, including feature, timestamp, and entity - columns. - features: The list of features defined as part of this feature view. Each - feature should also be included in the schema. + columns. If not specified, can be inferred from the underlying data source. + entity_columns: The list of entity columns contained in the schema. If not specified, + can be inferred from the underlying data source. + features: The list of feature columns contained in the schema. If not specified, + can be inferred from the underlying data source. online: A boolean indicating whether online retrieval is enabled for this feature view. description: A human-readable description. @@ -84,6 +83,7 @@ class FeatureView(BaseFeatureView): batch_source: DataSource stream_source: Optional[DataSource] schema: List[Field] + entity_columns: List[Field] features: List[Field] online: bool description: str @@ -129,14 +129,15 @@ def __init__( owner (optional): The owner of the feature view, typically the email of the primary maintainer. schema (optional): The schema of the feature view, including feature, timestamp, - and entity columns. + and entity columns. If entity columns are included in the schema, a List[Entity] + must be passed to `entities` instead of a List[str]; otherwise, the entity columns + will be mistakenly interpreted as feature columns. source (optional): The source of data for this group of features. May be a stream source, or a batch source. If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. """ - positional_attributes = ["name", "entities", "ttl"] _name = name @@ -167,11 +168,21 @@ def __init__( raise ValueError("feature view name needs to be specified") self.name = _name + self.entities = ( [e.name if isinstance(e, Entity) else e for e in _entities] if _entities else [DUMMY_ENTITY_NAME] ) + if _entities and isinstance(_entities[0], str): + warnings.warn( + ( + "The `entities` parameter should be a list of `Entity` objects. " + "Feast 0.22 and onwards will not support passing in a list of " + "strings to define entities." + ), + DeprecationWarning, + ) self._initialize_sources(_name, batch_source, stream_source, source) @@ -206,14 +217,30 @@ def __init__( _schema = [Field.from_feature(feature) for feature in features] self.schema = _schema - # TODO(felixwang9817): Infer which fields in the schema are features, timestamps, - # and entities. For right now we assume that all fields are features, since the - # current `features` parameter only accepts feature columns. - _features = _schema + # If a user has added entity fields to schema, then they should also have switched + # to using a List[Entity], in which case entity and feature columns can be separated + # here. Conversely, if the user is still using a List[str], they must not have added + # added entity fields, in which case we can set the `features` attribute directly + # equal to the schema. + _features: List[Field] = [] + self.entity_columns = [] + if _entities and len(_entities) > 0 and isinstance(_entities[0], str): + _features = _schema + else: + join_keys = [] + if _entities: + for entity in _entities: + if isinstance(entity, Entity): + join_keys += entity.join_keys + + for field in _schema: + if field.name in join_keys: + self.entity_columns.append(field) + else: + _features.append(field) - cols = [entity for entity in self.entities] + [ - field.name for field in _features - ] + # TODO(felixwang9817): Add more robust validation of features. + cols = [field.name for field in _schema] for col in cols: if ( self.batch_source.field_mapping is not None @@ -276,7 +303,6 @@ def __hash__(self): def __copy__(self): fv = FeatureView( name=self.name, - entities=self.entities, ttl=self.ttl, source=self.batch_source, stream_source=self.stream_source, @@ -284,6 +310,13 @@ def __copy__(self): tags=self.tags, online=self.online, ) + + # This is deliberately set outside of the FV initialization to avoid the deprecation warning. + # TODO(felixwang9817): Move this into the FV initialization when the deprecation warning + # is removed. + fv.entities = self.entities + fv.features = copy.copy(self.features) + fv.entity_columns = copy.copy(self.entity_columns) fv.projection = copy.copy(self.projection) return fv @@ -302,12 +335,16 @@ def __eq__(self, other): or self.online != other.online or self.batch_source != other.batch_source or self.stream_source != other.stream_source - or self.schema != other.schema ): return False return True + @property + def join_keys(self) -> List[str]: + """Returns a list of all the join keys.""" + return [entity.name for entity in self.entity_columns] + def ensure_valid(self): """ Validates the state of this feature view locally. @@ -393,7 +430,8 @@ def to_proto(self) -> FeatureViewProto: spec = FeatureViewSpecProto( name=self.name, entities=self.entities, - features=[field.to_proto() for field in self.schema], + entity_columns=[field.to_proto() for field in self.entity_columns], + features=[field.to_proto() for field in self.features], description=self.description, tags=self.tags, owner=self.owner, @@ -424,11 +462,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) feature_view = cls( name=feature_view_proto.spec.name, - entities=[entity for entity in feature_view_proto.spec.entities], - schema=[ - Field.from_proto(field_proto) - for field_proto in feature_view_proto.spec.features - ], + entities=feature_view_proto.spec.entities, description=feature_view_proto.spec.description, tags=dict(feature_view_proto.spec.tags), owner=feature_view_proto.spec.owner, @@ -443,6 +477,16 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): if stream_source: feature_view.stream_source = stream_source + # Instead of passing in a schema, we set the features and entity columns. + feature_view.features = [ + Field.from_proto(field_proto) + for field_proto in feature_view_proto.spec.features + ] + feature_view.entity_columns = [ + Field.from_proto(field_proto) + for field_proto in feature_view_proto.spec.entity_columns + ] + # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. feature_view.projection = FeatureViewProjection.from_definition(feature_view) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 5c0f0e1d28..d3f2d94532 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -4,70 +4,13 @@ from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource from feast.data_source import DataSource, PushSource, RequestSource from feast.errors import RegistryInferenceFailure -from feast.feature_view import FeatureView +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, FeatureView from feast.field import Field, from_value_type from feast.repo_config import RepoConfig +from feast.types import String from feast.value_type import ValueType -def update_entities_with_inferred_types_from_feature_views( - entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig -) -> None: - """ - Infers the types of the entities by examining the schemas of feature view batch sources. - - Args: - entities: The entities to be updated. - feature_views: A list containing feature views associated with the entities. - config: The config for the current feature store. - """ - incomplete_entities = { - entity.name: entity - for entity in entities - if entity.value_type == ValueType.UNKNOWN - } - incomplete_entities_keys = incomplete_entities.keys() - - for view in feature_views: - if not (incomplete_entities_keys & set(view.entities)): - continue # skip if view doesn't contain any entities that need inference - - col_names_and_types = list( - view.batch_source.get_table_column_names_and_types(config) - ) - for entity_name in view.entities: - if entity_name in incomplete_entities: - entity = incomplete_entities[entity_name] - - # get entity information from information extracted from the view batch source - extracted_entity_name_type_pairs = list( - filter(lambda tup: tup[0] == entity.join_key, col_names_and_types,) - ) - if len(extracted_entity_name_type_pairs) == 0: - # Doesn't mention inference error because would also be an error without inferencing - raise ValueError( - f"""No column in the batch source for the {view.name} feature view matches - its entity's name.""" - ) - - inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()( - extracted_entity_name_type_pairs[0][1] - ) - - if ( - entity.value_type != ValueType.UNKNOWN - and entity.value_type != inferred_value_type - ) or (len(extracted_entity_name_type_pairs) > 1): - raise RegistryInferenceFailure( - "Entity", - f"""Entity value_type inference failed for {entity_name} entity. - Multiple viable matches. - """, - ) - - entity.value_type = inferred_value_type - - def update_data_sources_with_inferred_event_timestamp_col( data_sources: List[DataSource], config: RepoConfig ) -> None: @@ -140,57 +83,115 @@ def update_data_sources_with_inferred_event_timestamp_col( ) -def update_feature_views_with_inferred_features( +def update_feature_views_with_inferred_features_and_entities( fvs: List[FeatureView], entities: List[Entity], config: RepoConfig ) -> None: """ - Infers the set of features associated to each FeatureView and updates the FeatureView with those features. - Inference occurs through considering each column of the underlying data source as a feature except columns that are - associated with the data source's timestamp columns and the FeatureView's entity columns. + Infers the set of features and entities associated with each feature view and updates + the feature view with those features and entities. Columns whose names match a join key + of an entity are considered to be entity columns; all other columns except designated + timestamp columns, are considered to be feature columns. Args: fvs: The feature views to be updated. entities: A list containing entities associated with the feature views. config: The config for the current feature store. """ - entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities} - join_keys = entity_name_to_join_key_map.values() + entity_name_to_entity_map = {e.name: e for e in entities} + entity_name_to_join_keys_map = {e.name: e.join_keys for e in entities} + all_join_keys = [ + join_key + for join_key_list in entity_name_to_join_keys_map.values() + for join_key in join_key_list + ] for fv in fvs: - # First drop all Entity fields. Then infer features if necessary. - fv.schema = [field for field in fv.schema if field.name not in join_keys] - fv.features = [field for field in fv.features if field.name not in join_keys] + # Fields whose names match a join key are considered to be entity columns; all + # other fields are considered to be feature columns. + for field in fv.schema: + if field.name in all_join_keys: + # Do not override a preexisting field with the same name. + if field.name not in [ + entity_column.name for entity_column in fv.entity_columns + ]: + fv.entity_columns.append(field) + else: + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) + + # Since the `value_type` parameter has not yet been fully deprecated for + # entities, we respect the `value_type` attribute if it still exists. + for entity_name in fv.entities: + entity = entity_name_to_entity_map[entity_name] + if ( + entity_name + not in [entity_column.name for entity_column in fv.entity_columns] + and entity.value_type != ValueType.UNKNOWN + ): + fv.entity_columns.append( + Field( + name=entity.join_key, dtype=from_value_type(entity.value_type), + ) + ) + + # Handle EFV separately here. Specifically, that means if we have an EFV, + # we need to add a field to entity_columns. + if len(fv.entities) == 1 and fv.entities[0] == DUMMY_ENTITY_NAME: + fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + + # Run inference if either (a) there are fewer entity fields than expected or + # (b) there are no feature fields. + run_inference = len(fv.features) == 0 + num_expected_join_keys = sum( + [ + len(entity_name_to_join_keys_map[entity_name]) + for entity_name in fv.entities + ] + ) + if len(fv.entity_columns) < num_expected_join_keys: + run_inference = True + + if run_inference: + join_keys = set( + [ + join_key + for entity_name in fv.entities + for join_key in entity_name_to_join_keys_map[entity_name] + ] + ) - if not fv.features: columns_to_exclude = { fv.batch_source.timestamp_field, fv.batch_source.created_timestamp_column, - } | { - entity_name_to_join_key_map[entity_name] for entity_name in fv.entities } + for column in columns_to_exclude: + if column in fv.batch_source.field_mapping: + columns_to_exclude.remove(column) + columns_to_exclude.add(fv.batch_source.field_mapping[column]) - if fv.batch_source.timestamp_field in fv.batch_source.field_mapping: - columns_to_exclude.add( - fv.batch_source.field_mapping[fv.batch_source.timestamp_field] - ) - if ( - fv.batch_source.created_timestamp_column - in fv.batch_source.field_mapping - ): - columns_to_exclude.add( - fv.batch_source.field_mapping[ - fv.batch_source.created_timestamp_column - ] - ) + table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( + config + ) - for ( - col_name, - col_datatype, - ) in fv.batch_source.get_table_column_names_and_types(config): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", - col_name, # double underscores often signal an internal-use column - ): + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: + field = Field( + name=col_name, + dtype=from_value_type( + fv.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + entity_column.name for entity_column in fv.entity_columns + ]: + fv.entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column feature_name = ( fv.batch_source.field_mapping[col_name] if col_name in fv.batch_source.field_mapping @@ -204,10 +205,8 @@ def update_feature_views_with_inferred_features( ) ), ) - # Note that schema and features are two different attributes of a - # FeatureView, and that features should be present in both. - fv.schema.append(field) - fv.features.append(field) + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) if not fv.features: raise RegistryInferenceFailure( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index a85cd880b1..ca46133621 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -199,10 +199,9 @@ def evaluate_historical_retrieval(): # Build a list of entity columns to join on (from the right table) join_keys = [] - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, project) + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.append(join_key) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index b6c3d300d4..dad0ca5b78 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -60,11 +60,9 @@ def get_expected_join_keys( ) -> Set[str]: join_keys = set() for feature_view in feature_views: - entities = feature_view.entities - for entity_name in entities: - entity = registry.get_entity(entity_name, project) + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.add(join_key) return join_keys @@ -114,14 +112,14 @@ def get_feature_view_query_context( query_context = [] for feature_view, features in feature_views_to_feature_map.items(): - join_keys, entity_selections = [], [] - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, project) + join_keys: List[str] = [] + entity_selections: List[str] = [] + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.append(join_key) - entity_selections.append(f"{entity.join_key} AS {join_key}") + entity_selections.append(f"{entity_column.name} AS {join_key}") if isinstance(feature_view.ttl, timedelta): ttl_seconds = int(feature_view.ttl.total_seconds()) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 9ceceff0ac..1a8eedb21e 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -106,9 +106,9 @@ def update( (usually this happens when the last feature view that was using specific compound key is deleted) and remove all features attached to this "join_keys". """ - join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep) + join_keys_to_keep = set(tuple(table.join_keys) for table in tables_to_keep) - join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete) + join_keys_to_delete = set(tuple(table.join_keys) for table in tables_to_delete) for join_keys in join_keys_to_delete - join_keys_to_keep: self.delete_entity_values(config, list(join_keys)) @@ -122,7 +122,7 @@ def teardown( """ We delete the keys in redis for tables/views being removed. """ - join_keys_to_delete = set(tuple(table.entities) for table in tables) + join_keys_to_delete = set(tuple(table.join_keys) for table in tables) for join_keys in join_keys_to_delete: self.delete_entity_values(config, list(join_keys)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 6364297b1e..98d12d9b5e 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -159,11 +159,16 @@ def materialize_single_feature_view( if feature_view.batch_source.field_mapping is not None: table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - join_keys = {entity.join_key: entity.value_type for entity in entities} + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } with tqdm_builder(table.num_rows) as pbar: for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys) + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) self.online_write_batch( self.repo_config, feature_view, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index f8c2a4482f..cee8cb2d53 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -388,7 +388,7 @@ def _convert_arrow_to_proto( table = table.to_batches()[0] columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.schema + (field.name, field.dtype.to_value_type()) for field in feature_view.features ] + list(join_keys.items()) proto_values_by_column = { diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index d2cec18e52..2d41de5291 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -647,7 +647,7 @@ def decorator(user_function): def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: - return BatchFeatureView( + bfv = BatchFeatureView( name=fv.name, entities=fv.entities, ttl=fv.ttl, @@ -657,3 +657,7 @@ def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: schema=fv.schema, source=fv.source, ) + + bfv.features = copy.copy(fv.features) + bfv.entities = copy.copy(fv.entities) + return bfv diff --git a/sdk/python/feast/templates/aws/driver_repo.py b/sdk/python/feast/templates/aws/driver_repo.py index 5188f57cf8..a6f31f07b4 100644 --- a/sdk/python/feast/templates/aws/driver_repo.py +++ b/sdk/python/feast/templates/aws/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, RedshiftSource, ValueType +from feast import Entity, FeatureView, Field, RedshiftSource from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,8 +13,6 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], - # The storage level type for an entity - value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -41,7 +39,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index 7d137f996b..b3bd868f14 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import BigQuerySource, Entity, FeatureView, Field, ValueType +from feast import BigQuerySource, Entity, FeatureView, Field from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,8 +13,6 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], - # The storage level type for an entity - value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -39,7 +37,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/hbase/example.py b/sdk/python/feast/templates/hbase/example.py index 1d441e0e99..b34696185b 100644 --- a/sdk/python/feast/templates/hbase/example.py +++ b/sdk/python/feast/templates/hbase/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,7 +16,7 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) +driver = Entity(name="driver", join_keys=["driver_id"]) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index 1d441e0e99..26821b8a93 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,14 +16,14 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) +driver = Entity(name="driver", join_keys=["driver_id"]) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this # data to our model online. driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/postgres/driver_repo.py b/sdk/python/feast/templates/postgres/driver_repo.py index 34bc0022e2..4096943bb7 100644 --- a/sdk/python/feast/templates/postgres/driver_repo.py +++ b/sdk/python/feast/templates/postgres/driver_repo.py @@ -18,7 +18,7 @@ driver_stats_fv = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(weeks=52), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py index ecccb9863b..873e2b163e 100644 --- a/sdk/python/feast/templates/snowflake/driver_repo.py +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -43,7 +43,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py index 58f3df740f..41f86ef3c1 100644 --- a/sdk/python/feast/templates/spark/example.py +++ b/sdk/python/feast/templates/spark/example.py @@ -5,7 +5,7 @@ from datetime import timedelta from pathlib import Path -from feast import Entity, FeatureView, Field, ValueType +from feast import Entity, FeatureView, Field from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) @@ -16,10 +16,8 @@ # Entity definitions -driver = Entity(name="driver", value_type=ValueType.INT64, description="driver id",) -customer = Entity( - name="customer", value_type=ValueType.INT64, description="customer id", -) +driver = Entity(name="driver", description="driver id",) +customer = Entity(name="customer", description="customer id",) # Sources driver_hourly_stats = SparkSource( @@ -40,7 +38,7 @@ # Feature Views driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver"], + entities=[driver], ttl=timedelta(days=7), schema=[ Field(name="conv_rate", dtype=Float32), @@ -53,7 +51,7 @@ ) customer_daily_profile_view = FeatureView( name="customer_daily_profile", - entities=["customer"], + entities=[customer], ttl=timedelta(days=7), schema=[ Field(name="current_balance", dtype=Float32), diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index e08597b67b..186c39b9ef 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -4,11 +4,11 @@ import pandas as pd from pytz import timezone, utc -from feast.value_type import ValueType +from feast.types import FeastType, Float32, Int32, Int64, String def create_dataset( - entity_type: ValueType = ValueType.INT32, + entity_type: FeastType = Int32, feature_dtype: str = None, feature_is_list: bool = False, list_has_empty_list: bool = False, @@ -16,7 +16,7 @@ def create_dataset( now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) ts = pd.Timestamp(now).round("ms") data = { - "driver_id": get_entities_for_value_type(entity_type), + "driver_id": get_entities_for_feast_type(entity_type), "value": get_feature_values_for_dtype( feature_dtype, feature_is_list, list_has_empty_list ), @@ -37,14 +37,14 @@ def create_dataset( return pd.DataFrame.from_dict(data) -def get_entities_for_value_type(value_type: ValueType) -> List: - value_type_map: Dict[ValueType, List] = { - ValueType.INT32: [1, 2, 1, 3, 3], - ValueType.INT64: [1, 2, 1, 3, 3], - ValueType.FLOAT: [1.0, 2.0, 1.0, 3.0, 3.0], - ValueType.STRING: ["1", "2", "1", "3", "3"], +def get_entities_for_feast_type(feast_type: FeastType) -> List: + feast_type_map: Dict[FeastType, List] = { + Int32: [1, 2, 1, 3, 3], + Int64: [1, 2, 1, 3, 3], + Float32: [1.0, 2.0, 1.0, 3.0, 3.0], + String: ["1", "2", "1", "3", "3"], } - return value_type_map[value_type] + return feast_type_map[feast_type] def get_feature_values_for_dtype( diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 65d5f3da28..31f181ad53 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -11,15 +11,13 @@ def setup_feature_store(): """Prepares the local environment for a FeatureStore docstring test.""" from datetime import datetime, timedelta - from feast import Entity, FeatureStore, FeatureView, Field, FileSource, ValueType + from feast import Entity, FeatureStore, FeatureView, Field, FileSource from feast.repo_operations import init_repo from feast.types import Float32, Int64 init_repo("feature_repo", "local") fs = FeatureStore(repo_path="feature_repo") - driver = Entity( - name="driver_id", value_type=ValueType.INT64, description="driver id", - ) + driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats = FileSource( path="feature_repo/data/driver_stats.parquet", timestamp_field="event_timestamp", @@ -27,7 +25,7 @@ def setup_feature_store(): ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(seconds=86400 * 1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index d8b6d7c89b..8d6d96d9ef 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -1,14 +1,6 @@ from datetime import timedelta -from feast import ( - BigQuerySource, - Entity, - FeatureService, - FeatureView, - Field, - PushSource, - ValueType, -) +from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field, PushSource from feast.types import Float32, Int64, String driver_locations_source = BigQuerySource( @@ -46,22 +38,24 @@ driver = Entity( name="driver", # The name is derived from this argument, not object name. join_keys=["driver_id"], - value_type=ValueType.INT64, description="driver id", ) customer = Entity( name="customer", # The name is derived from this argument, not object name. join_keys=["customer_id"], - value_type=ValueType.STRING, ) driver_locations = FeatureView( name="driver_locations", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), - schema=[Field(name="lat", dtype=Float32), Field(name="lon", dtype=String)], + schema=[ + Field(name="lat", dtype=Float32), + Field(name="lon", dtype=String), + Field(name="driver_id", dtype=Int64), + ], online=True, batch_source=driver_locations_source, tags={}, @@ -69,11 +63,12 @@ pushed_driver_locations = FeatureView( name="pushed_driver_locations", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="driver_lat", dtype=Float32), Field(name="driver_long", dtype=String), + Field(name="driver_id", dtype=Int64), ], online=True, stream_source=driver_locations_push_source, @@ -82,12 +77,13 @@ customer_profile = FeatureView( name="customer_profile", - entities=["customer"], + entities=[customer], ttl=timedelta(days=1), schema=[ Field(name="avg_orders_day", dtype=Float32), Field(name="name", dtype=String), Field(name="age", dtype=Int64), + Field(name="customer_id", dtype=String), ], online=True, batch_source=customer_profile_source, @@ -96,9 +92,13 @@ customer_driver_combined = FeatureView( name="customer_driver_combined", - entities=["customer", "driver"], + entities=[customer, driver], ttl=timedelta(days=1), - schema=[Field(name="trips", dtype=Int64)], + schema=[ + Field(name="trips", dtype=Int64), + Field(name="driver_id", dtype=Int64), + Field(name="customer_id", dtype=String), + ], online=True, batch_source=customer_driver_combined_source, tags={}, diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index 1ca7cc3805..75e99aa4b9 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int32, Int64 driver_hourly_stats = FileSource( @@ -9,17 +9,18 @@ created_timestamp_column="created", ) -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), ], online=True, batch_source=driver_hourly_stats, diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py index cbcc3ad172..4b079999ed 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py @@ -1,14 +1,16 @@ from datetime import timedelta -from feast import FeatureView, FileSource +from feast import Entity, FeatureView, FileSource driver_hourly_stats = FileSource( path="driver_stats.parquet", # this parquet is not real and will not be read ) +driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) + driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=["driver_id"], + entities=[driver], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), @@ -17,7 +19,7 @@ driver_hourly_stats_view_dup1 = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=["driver_id"], + entities=[driver], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index ba18cf84ba..4bc0923e19 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 driver_hourly_stats = FileSource( @@ -11,17 +11,12 @@ # The join key here is deliberately different from the parquet file to test the failure path. -driver = Entity( - name="driver_id", - value_type=ValueType.INT64, - description="driver id", - join_keys=["driver"], -) +driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index 12c1eb8628..53e4a32a82 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -19,7 +19,7 @@ import pytest -from feast import Entity, RepoConfig, ValueType +from feast import Entity, RepoConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig @@ -60,7 +60,6 @@ def test_usage_on(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", - value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) @@ -99,7 +98,6 @@ def test_usage_off(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", - value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) test_feature_store.apply([entity]) diff --git a/sdk/python/tests/integration/feature_repos/universal/entities.py b/sdk/python/tests/integration/feature_repos/universal/entities.py index b7a7583f1b..66989b0646 100644 --- a/sdk/python/tests/integration/feature_repos/universal/entities.py +++ b/sdk/python/tests/integration/feature_repos/universal/entities.py @@ -1,22 +1,21 @@ -from feast import Entity, ValueType +from feast import Entity -def driver(value_type: ValueType = ValueType.INT64): +def driver(): return Entity( name="driver", # The name is derived from this argument, not object name. - value_type=value_type, description="driver id", join_keys=["driver_id"], ) def customer(): - return Entity(name="customer_id", value_type=ValueType.INT64) + return Entity(name="customer_id") def location(): - return Entity(name="location_id", value_type=ValueType.INT64) + return Entity(name="location_id") def item(): - return Entity(name="item_id", value_type=ValueType.INT64) + return Entity(name="item_id") diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 26c2513995..b93ad987fa 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -14,8 +14,13 @@ ValueType, ) from feast.data_source import DataSource, RequestSource -from feast.types import Array, FeastType, Float32, Float64, Int32 -from tests.integration.feature_repos.universal.entities import location +from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + item, + location, +) def driver_feature_view( @@ -23,11 +28,14 @@ def driver_feature_view( name="test_correctness", infer_features: bool = False, dtype: FeastType = Float32, + entity_type: FeastType = Int64, ) -> FeatureView: + d = driver() return FeatureView( name=name, - entities=["driver"], - schema=None if infer_features else [Field(name="value", dtype=dtype)], + entities=[d], + schema=[Field(name=d.join_key, dtype=entity_type)] + + ([] if infer_features else [Field(name="value", dtype=dtype)]), ttl=timedelta(days=5), source=data_source, ) @@ -138,7 +146,7 @@ def create_similarity_request_source(): def create_item_embeddings_feature_view(source, infer_features: bool = False): item_embeddings_feature_view = FeatureView( name="item_embeddings", - entities=["item"], + entities=[item()], schema=None if infer_features else [ @@ -156,7 +164,7 @@ def create_item_embeddings_batch_feature_view( ) -> BatchFeatureView: item_embeddings_feature_view = BatchFeatureView( name="item_embeddings", - entities=["item"], + entities=[item()], schema=None if infer_features else [ @@ -170,15 +178,19 @@ def create_item_embeddings_batch_feature_view( def create_driver_hourly_stats_feature_view(source, infer_features: bool = False): + # TODO(felixwang9817): Figure out why not adding an entity field here + # breaks type tests. + d = driver() driver_stats_feature_view = FeatureView( name="driver_stats", - entities=["driver"], + entities=[d], schema=None if infer_features else [ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int32), + Field(name=d.join_key, dtype=Int64), ], source=source, ttl=timedelta(hours=2), @@ -191,7 +203,7 @@ def create_driver_hourly_stats_batch_feature_view( ) -> BatchFeatureView: driver_stats_feature_view = BatchFeatureView( name="driver_stats", - entities=["driver"], + entities=[driver()], schema=None if infer_features else [ @@ -208,7 +220,7 @@ def create_driver_hourly_stats_batch_feature_view( def create_customer_daily_profile_feature_view(source, infer_features: bool = False): customer_profile_feature_view = FeatureView( name="customer_profile", - entities=["customer_id"], + entities=[customer()], schema=None if infer_features else [ @@ -242,10 +254,13 @@ def create_global_stats_feature_view(source, infer_features: bool = False): def create_order_feature_view(source, infer_features: bool = False): return FeatureView( name="order", - entities=["driver", "customer_id"], + entities=[customer(), driver()], schema=None if infer_features - else [Field(name="order_is_success", dtype=Int32)], + else [ + Field(name="order_is_success", dtype=Int32), + Field(name="driver_id", dtype=Int64), + ], source=source, ttl=timedelta(days=2), ) @@ -255,7 +270,12 @@ def create_location_stats_feature_view(source, infer_features: bool = False): location_stats_feature_view = FeatureView( name="location_stats", entities=[location()], - schema=None if infer_features else [Field(name="temperature", dtype=Int32)], + schema=None + if infer_features + else [ + Field(name="temperature", dtype=Int32), + Field(name="location_id", dtype=Int64), + ], source=source, ttl=timedelta(days=2), ) @@ -279,9 +299,11 @@ def create_pushable_feature_view(batch_source: DataSource): ) return FeatureView( name="pushable_location_stats", - entities=["location_id"], - # Test that Features still work for FeatureViews. - features=[Feature(name="temperature", dtype=ValueType.INT32)], + entities=[location()], + schema=[ + Field(name="temperature", dtype=Int32), + Field(name="location_id", dtype=Int64), + ], ttl=timedelta(days=2), source=push_source, ) diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index f15eb8a849..43a04639f9 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -21,7 +21,11 @@ UniversalDatasets, construct_universal_feature_views, ) -from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 @@ -33,7 +37,7 @@ def test_feature_service_logging(environment, universal_data_sources): (_, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - store.apply([driver(), *feature_views.values()]) + store.apply([customer(), driver(), location(), *feature_views.values()]) logs_df = prepare_logs(datasets) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index b62f7cda24..c99d19b096 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -22,7 +22,6 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) from feast.types import Int32 -from feast.value_type import ValueType from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, table_name_from_data_source, @@ -689,10 +688,10 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm created_timestamp_column="created", ) - driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64) + driver = Entity(name="driver", join_keys=["driver_id"]) driver_fv = FeatureView( name="driver_stats", - entities=["driver"], + entities=[driver], schema=[Field(name="avg_daily_trips", dtype=Int32)], batch_source=driver_stats_data_source, ttl=None, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index f4440dbfbc..2af5fe543d 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -13,7 +13,7 @@ import requests from botocore.exceptions import BotoCoreError -from feast import Entity, FeatureService, FeatureView, Field, ValueType +from feast import Entity, FeatureService, FeatureView, Field from feast.errors import ( FeatureNameCollisionError, RequestDataNotFoundInEntityRowsException, @@ -115,13 +115,13 @@ def test_write_to_online_store_event_check(local_redis_environment): } dataframe_source = pd.DataFrame(data) with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", value_type=ValueType.STRING) + e = Entity(name="id") # Create Feature View fv1 = FeatureView( name="feature_view_123", schema=[Field(name="string_col", dtype=String)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index db4c6700ce..88a4b9f249 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -27,10 +27,8 @@ from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig -from feast.protos.feast.types import Value_pb2 as ValueProto from feast.repo_config import RepoConfig from feast.types import Array, Bytes, Float64, Int64, String -from feast.value_type import ValueType from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -93,10 +91,7 @@ def feature_store_with_s3_registry(): ) def test_apply_entity_success(test_feature_store): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) # Register Entity @@ -108,7 +103,6 @@ def test_apply_entity_success(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -127,10 +121,7 @@ def test_apply_entity_success(test_feature_store): ) def test_apply_entity_integration(test_feature_store): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) # Register Entity @@ -142,7 +133,6 @@ def test_apply_entity_integration(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -151,7 +141,6 @@ def test_apply_entity_integration(test_feature_store): entity = test_feature_store.get_entity("driver_car_id") assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -173,6 +162,8 @@ def test_apply_feature_view_success(test_feature_store): date_partition_column="date_partition_col", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["entity_id"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -180,15 +171,16 @@ def test_apply_feature_view_success(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="entity_id", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([fv1]) + test_feature_store.apply([entity, fv1]) feature_views = test_feature_store.list_feature_views() @@ -217,13 +209,11 @@ def test_apply_feature_view_success(test_feature_store): @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_feature_view_inference_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - entity = Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 - ) + entity = Entity(name="id", join_keys=["id_join_key"]) fv1 = FeatureView( name="fv1", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=file_source, @@ -232,7 +222,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv2 = FeatureView( name="fv2", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_table_arg(dataframe_source, "ts_1"), @@ -241,7 +231,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv3 = FeatureView( name="fv3", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_query_arg(dataframe_source, "ts_1"), @@ -296,6 +286,8 @@ def test_apply_feature_view_integration(test_feature_store): date_partition_column="date_partition_col", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -303,15 +295,16 @@ def test_apply_feature_view_integration(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="test", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([fv1]) + test_feature_store.apply([fv1, entity]) feature_views = test_feature_store.list_feature_views() @@ -364,13 +357,9 @@ def test_apply_object_and_read(test_feature_store): created_timestamp_column="timestamp", ) - e1 = Entity( - name="fs1_my_entity_1", value_type=ValueType.STRING, description="something" - ) + e1 = Entity(name="fs1_my_entity_1", description="something") - e2 = Entity( - name="fs1_my_entity_2", value_type=ValueType.STRING, description="something" - ) + e2 = Entity(name="fs1_my_entity_2", description="something") fv1 = FeatureView( name="my_feature_view_1", @@ -379,8 +368,9 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="fs1_my_entity_1", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[e1], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -393,8 +383,9 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="fs1_my_entity_2", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[e2], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -406,7 +397,6 @@ def test_apply_object_and_read(test_feature_store): fv1_actual = test_feature_store.get_feature_view("my_feature_view_1") e1_actual = test_feature_store.get_entity("fs1_my_entity_1") - assert fv1 == fv1_actual assert e1 == e1_actual assert fv2 != fv1_actual assert e2 != e1_actual @@ -434,13 +424,13 @@ def test_apply_remote_repo(): def test_reapply_feature_view_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", join_keys=["id_join_key"], value_type=ValueType.STRING) + e = Entity(name="id", join_keys=["id_join_key"]) # Create Feature View fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="string_col", dtype=String)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -470,7 +460,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="int64_col", dtype=Int64)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -485,10 +475,12 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): def test_apply_conflicting_featureview_names(feature_store_with_local_registry): """Test applying feature views with non-case-insensitively unique names""" + driver = Entity(name="driver", join_keys=["driver_id"]) + customer = Entity(name="customer", join_keys=["customer_id"]) driver_stats = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="driver_stats.parquet"), @@ -497,7 +489,7 @@ def test_apply_conflicting_featureview_names(feature_store_with_local_registry): customer_stats = FeatureView( name="DRIVER_HOURLY_STATS", - entities=["id"], + entities=[customer], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="customer_stats.parquet"), diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 8b719eb733..aab79af709 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -23,8 +23,7 @@ from feast.field import Field from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_entities_with_inferred_types_from_feature_views, - update_feature_views_with_inferred_features, + update_feature_views_with_inferred_features_and_entities, ) from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, @@ -38,47 +37,6 @@ ) -def test_update_entities_with_inferred_types_from_feature_views( - simple_dataset_1, simple_dataset_2 -): - with prep_file_source( - df=simple_dataset_1, timestamp_field="ts_1" - ) as file_source, prep_file_source( - df=simple_dataset_2, timestamp_field="ts_1" - ) as file_source_2: - - fv1 = FeatureView( - name="fv1", entities=["id"], batch_source=file_source, ttl=None, - ) - fv2 = FeatureView( - name="fv2", entities=["id"], batch_source=file_source_2, ttl=None, - ) - - actual_1 = Entity(name="id", join_keys=["id_join_key"]) - actual_2 = Entity(name="id", join_keys=["id_join_key"]) - - update_entities_with_inferred_types_from_feature_views( - [actual_1], [fv1], RepoConfig(provider="local", project="test") - ) - update_entities_with_inferred_types_from_feature_views( - [actual_2], [fv2], RepoConfig(provider="local", project="test") - ) - assert actual_1 == Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 - ) - assert actual_2 == Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.STRING - ) - - with pytest.raises(RegistryInferenceFailure): - # two viable data types - update_entities_with_inferred_types_from_feature_views( - [Entity(name="id", join_keys=["id_join_key"])], - [fv1, fv2], - RepoConfig(provider="local", project="test"), - ) - - def test_infer_datasource_names_file(): file_path = "path/to/test.csv" data_source = FileSource(path=file_path) @@ -287,7 +245,7 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: test_view_with_missing_feature.infer_features() -def test_update_feature_views_with_inferred_features(): +def test_update_feature_views_with_inferred_features_and_entities(): file_source = FileSource(name="test", path="test path") entity1 = Entity(name="test1", join_keys=["test_column_1"]) entity2 = Entity(name="test2", join_keys=["test_column_2"]) @@ -312,23 +270,26 @@ def test_update_feature_views_with_inferred_features(): ) assert len(feature_view_1.schema) == 2 - assert len(feature_view_1.features) == 2 + assert len(feature_view_1.features) == 1 # The entity field should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( [feature_view_1], [entity1], RepoConfig(provider="local", project="test") ) - assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.schema) == 2 assert len(feature_view_1.features) == 1 assert len(feature_view_2.schema) == 3 - assert len(feature_view_2.features) == 3 + assert len(feature_view_2.features) == 1 # The entity fields should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( [feature_view_2], [entity1, entity2], RepoConfig(provider="local", project="test"), ) - assert len(feature_view_2.schema) == 1 + assert len(feature_view_2.schema) == 3 assert len(feature_view_2.features) == 1 + + +# TODO(felixwang9817): Add tests that interact with field mapping. diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index f011d73d2d..dde31ac9e0 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -26,7 +26,6 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.on_demand_feature_view import RequestSource, on_demand_feature_view -from feast.protos.feast.types import Value_pb2 as ValueProto from feast.registry import Registry from feast.repo_config import RegistryConfig from feast.types import Array, Bytes, Float32, Int32, Int64, String @@ -73,10 +72,7 @@ def s3_registry() -> Registry: ) def test_apply_entity_success(test_registry): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -90,7 +86,6 @@ def test_apply_entity_success(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -99,7 +94,6 @@ def test_apply_entity_success(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -122,10 +116,7 @@ def test_apply_entity_success(test_registry): ) def test_apply_entity_integration(test_registry): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -139,7 +130,6 @@ def test_apply_entity_integration(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -148,7 +138,6 @@ def test_apply_entity_integration(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -173,6 +162,8 @@ def test_apply_feature_view_success(test_registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -181,7 +172,7 @@ def test_apply_feature_view_success(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -253,10 +244,12 @@ def test_modify_feature_views_success(test_registry, request_source_schema): request_source = RequestSource(name="request_source", schema=request_source_schema,) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="fs1_my_feature_1", dtype=Int64)], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -369,6 +362,8 @@ def test_apply_feature_view_integration(test_registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -377,7 +372,7 @@ def test_apply_feature_view_integration(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -444,6 +439,8 @@ def test_apply_data_source(test_registry: Registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -452,7 +449,7 @@ def test_apply_data_source(test_registry: Registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -499,10 +496,7 @@ def test_commit(): test_registry = Registry(registry_config, None) entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -517,7 +511,6 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -526,7 +519,6 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project, allow_cache=True) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -552,7 +544,6 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -561,7 +552,6 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 81fa0200fd..cfb0393d2f 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -9,8 +9,17 @@ import pytest from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.types import Array, Bool, Float32, Int32, Int64, UnixTimestamp -from feast.value_type import ValueType +from feast.types import ( + Array, + Bool, + FeastType, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) from tests.data.data_creator import create_dataset from tests.integration.feature_repos.repo_configuration import ( FULL_REPO_CONFIGS, @@ -26,11 +35,11 @@ def populate_test_configs(offline: bool): entity_type_feature_dtypes = [ - (ValueType.INT32, "int32"), - (ValueType.INT64, "int64"), - (ValueType.STRING, "float"), - (ValueType.STRING, "bool"), - (ValueType.INT32, "datetime"), + (Int32, "int32"), + (Int64, "int64"), + (String, "float"), + (String, "bool"), + (Int32, "datetime"), ] configs: List[TypeTestConfig] = [] for test_repo_config in FULL_REPO_CONFIGS: @@ -60,7 +69,7 @@ def populate_test_configs(offline: bool): @dataclass(frozen=True, repr=True) class TypeTestConfig: - entity_type: ValueType + entity_type: FeastType feature_dtype: str feature_is_list: bool has_empty_list: bool @@ -117,6 +126,7 @@ def get_fixtures(request): config.feature_is_list, config.has_empty_list, data_source, + config.entity_type, ) def cleanup(): @@ -138,20 +148,29 @@ def test_entity_inference_types_match(offline_types_test_fixtures): environment, config, data_source, fv = offline_types_test_fixtures fs = environment.feature_store - # Don't specify value type in entity to force inference - entity = driver(value_type=ValueType.UNKNOWN) + # TODO(felixwang9817): Refactor this by finding a better way to force type inference. + # Override the schema and entity_columns to force entity inference. + entity = driver() + fv.schema = list(filter(lambda x: x.name != entity.join_key, fv.schema)) + fv.entity_columns = [] fs.apply([fv, entity]) entities = fs.list_entities() entity_type_to_expected_inferred_entity_type = { - ValueType.INT32: ValueType.INT64, - ValueType.INT64: ValueType.INT64, - ValueType.FLOAT: ValueType.DOUBLE, - ValueType.STRING: ValueType.STRING, + Int32: Int64, + Int64: Int64, + Float32: Float64, + String: String, } + for entity in entities: + entity_columns = list( + filter(lambda x: x.name == entity.join_key, fv.entity_columns) + ) + assert len(entity_columns) == 1 + entity_column = entity_columns[0] assert ( - entity.value_type + entity_column.dtype == entity_type_to_expected_inferred_entity_type[config.entity_type] ) @@ -173,13 +192,12 @@ def test_feature_get_historical_features_types_match(offline_types_test_fixtures config.feature_is_list, config.has_empty_list, data_source, + config.entity_type, ) fs.apply([fv, entity]) entity_df = pd.DataFrame() - entity_df["driver_id"] = ( - ["1", "3"] if config.entity_type == ValueType.STRING else [1, 3] - ) + entity_df["driver_id"] = ["1", "3"] if config.entity_type == String else [1, 3] ts = pd.Timestamp(datetime.utcnow()).round("ms") entity_df["ts"] = [ ts - timedelta(hours=4), @@ -222,10 +240,11 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): config.feature_is_list, config.has_empty_list, data_source, + config.entity_type, ) fs = environment.feature_store features = [fv.name + ":value"] - entity = driver(value_type=config.entity_type) + entity = driver() fs.apply([fv, entity]) fs.materialize( environment.start_date, @@ -234,7 +253,7 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): # we can successfully infer type even from all empty values ) - driver_id_value = "1" if config.entity_type == ValueType.STRING else 1 + driver_id_value = "1" if config.entity_type == String else 1 online_features = fs.get_online_features( features=features, entity_rows=[{"driver_id": driver_id_value}], ).to_dict() @@ -267,7 +286,7 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): def create_feature_view( - name, feature_dtype, feature_is_list, has_empty_list, data_source + name, feature_dtype, feature_is_list, has_empty_list, data_source, entity_type ): if feature_is_list is True: if feature_dtype == "int32": @@ -292,7 +311,9 @@ def create_feature_view( elif feature_dtype == "datetime": dtype = UnixTimestamp - return driver_feature_view(data_source, name=name, dtype=dtype,) + return driver_feature_view( + data_source, name=name, dtype=dtype, entity_type=entity_type + ) def assert_expected_historical_feature_types( diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index 3ab9bf196f..e5a7206b96 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -2,7 +2,7 @@ import pytest -from feast import BigQuerySource, FeatureView, Field +from feast import BigQuerySource, Entity, FeatureView, Field from feast.types import Float32, String from tests.utils.cli_utils import CliRunner, get_example_repo from tests.utils.online_read_write_test import basic_rw_test @@ -19,6 +19,7 @@ def test_partial() -> None: with runner.local_repo( get_example_repo("example_feature_repo_1.py"), "bigquery" ) as store: + driver = Entity(name="driver", join_keys=["test"]) driver_locations_source = BigQuerySource( table="feast-oss.public.drivers", @@ -28,12 +29,13 @@ def test_partial() -> None: driver_locations_100 = FeatureView( name="driver_locations_100", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="lat", dtype=Float32), Field(name="lon", dtype=String), Field(name="name", dtype=String), + Field(name="test", dtype=String), ], online=True, batch_source=driver_locations_source, diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index 483dae73e2..ae10c834c8 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -2,34 +2,36 @@ diff_registry_objects, tag_objects_for_keep_delete_update_add, ) +from feast.entity import Entity from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="id", join_keys=["id"]) to_delete = FeatureView( - name="to_delete", entities=["id"], batch_source=file_source, ttl=None, + name="to_delete", entities=[entity], batch_source=file_source, ttl=None, ) unchanged_fv = FeatureView( - name="fv1", entities=["id"], batch_source=file_source, ttl=None, + name="fv1", entities=[entity], batch_source=file_source, ttl=None, ) pre_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "after"}, ) to_add = FeatureView( - name="to_add", entities=["id"], batch_source=file_source, ttl=None, + name="to_add", entities=[entity], batch_source=file_source, ttl=None, ) keep, delete, update, add = tag_objects_for_keep_delete_update_add( @@ -52,16 +54,17 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): def test_diff_registry_objects_feature_views(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="id", join_keys=["id"]) pre_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "after"}, diff --git a/sdk/python/tests/unit/infra/test_provider.py b/sdk/python/tests/unit/infra/test_provider.py index 43c09760e9..5ed5603b03 100644 --- a/sdk/python/tests/unit/infra/test_provider.py +++ b/sdk/python/tests/unit/infra/test_provider.py @@ -20,14 +20,13 @@ from feast.field import Field from feast.infra.provider import _get_column_names from feast.types import String -from feast.value_type import ValueType def test_get_column_names_preserves_feature_ordering(): - entity = Entity("my-entity", description="My entity", value_type=ValueType.STRING) + entity = Entity("my-entity", description="My entity") fv = FeatureView( name="my-fv", - entities=["my-entity"], + entities=[entity], ttl=timedelta(days=1), batch_source=BigQuerySource(table="non-existent-mock"), schema=[ diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 254a975f67..04a857ddef 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -20,19 +20,14 @@ def test_join_key_default(): with pytest.deprecated_call(): - entity = Entity( - "my-entity", description="My entity", value_type=ValueType.STRING - ) + entity = Entity("my-entity", description="My entity") assert entity.join_key == "my-entity" def test_entity_class_contains_tags(): with pytest.deprecated_call(): entity = Entity( - "my-entity", - description="My entity", - value_type=ValueType.STRING, - tags={"key1": "val1", "key2": "val2"}, + "my-entity", description="My entity", tags={"key1": "val1", "key2": "val2"}, ) assert "key1" in entity.tags.keys() and entity.tags["key1"] == "val1" assert "key2" in entity.tags.keys() and entity.tags["key2"] == "val2" @@ -40,20 +35,18 @@ def test_entity_class_contains_tags(): def test_entity_without_tags_empty_dict(): with pytest.deprecated_call(): - entity = Entity( - "my-entity", description="My entity", value_type=ValueType.STRING - ) + entity = Entity("my-entity", description="My entity") assert entity.tags == dict() assert len(entity.tags) == 0 def test_entity_without_description(): with pytest.deprecated_call(): - Entity("my-entity", value_type=ValueType.STRING) + Entity("my-entity") def test_name_not_specified(): - assertpy.assert_that(lambda: Entity(value_type=ValueType.STRING)).raises(ValueError) + assertpy.assert_that(lambda: Entity()).raises(ValueError) def test_multiple_args(): @@ -61,15 +54,19 @@ def test_multiple_args(): def test_name_keyword(recwarn): - Entity(name="my-entity", value_type=ValueType.STRING) + Entity(name="my-entity") assert len(recwarn) == 0 + Entity(name="my-entity", join_key="test") + assert len(recwarn) == 1 + Entity(name="my-entity", join_keys=["test"]) + assert len(recwarn) == 1 def test_hash(): - entity1 = Entity(name="my-entity", value_type=ValueType.STRING) - entity2 = Entity(name="my-entity", value_type=ValueType.STRING) - entity3 = Entity(name="my-entity", value_type=ValueType.FLOAT) - entity4 = Entity(name="my-entity", value_type=ValueType.FLOAT, description="test") + entity1 = Entity(name="my-entity") + entity2 = Entity(name="my-entity") + entity3 = Entity(name="my-entity", join_keys=["not-my-entity"]) + entity4 = Entity(name="my-entity", join_keys=["not-my-entity"], description="test") s1 = {entity1, entity2} assert len(s1) == 1 diff --git a/sdk/python/tests/unit/test_feature_view.py b/sdk/python/tests/unit/test_feature_view.py index 80a583806e..1ef36081ec 100644 --- a/sdk/python/tests/unit/test_feature_view.py +++ b/sdk/python/tests/unit/test_feature_view.py @@ -62,3 +62,7 @@ def test_hash(): s4 = {feature_view_1, feature_view_2, feature_view_3, feature_view_4} assert len(s4) == 3 + + +# TODO(felixwang9817): Add tests for proto conversion. +# TODO(felixwang9817): Add tests for field mapping logic. diff --git a/sdk/python/tests/unit/test_unit_feature_store.py b/sdk/python/tests/unit/test_unit_feature_store.py index 6f9dd6acb0..0c13dffa62 100644 --- a/sdk/python/tests/unit/test_unit_feature_store.py +++ b/sdk/python/tests/unit/test_unit_feature_store.py @@ -17,7 +17,7 @@ class MockFeatureView: projection: MockFeatureViewProjection -def test__get_unique_entities(): +def test_get_unique_entities(): entity_values = { "entity_1": [Value(int64_val=1), Value(int64_val=2), Value(int64_val=1)], "entity_2": [ diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 82ffc8e98b..9f2f8ba60d 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -17,13 +17,13 @@ from feast.infra.provider import _convert_arrow_to_proto from feast.repo_config import RepoConfig from feast.types import Float32, Int32 -from feast.value_type import ValueType def create_driver_hourly_stats_feature_view(source): + driver = Entity(name="driver", join_keys=["driver_id"]) driver_stats_feature_view = FeatureView( name="driver_stats", - entities=["driver_id"], + entities=[driver], schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), @@ -61,7 +61,7 @@ def benchmark_writes(): # This is just to set data source to something, we're not reading from parquet source here. parquet_path = os.path.join(temp_dir, "data.parquet") - driver = Entity(name="driver_id", value_type=ValueType.INT64) + driver = Entity(name="driver_id") table = create_driver_hourly_stats_feature_view( create_driver_hourly_stats_source(parquet_path=parquet_path) )