Skip to content

Commit

Permalink
chore: Rename inputs parameter to sources for on demand feature views (
Browse files Browse the repository at this point in the history
…#2442)

* Rename `inputs` parameter to `sources` for odfv

Signed-off-by: Felix Wang <[email protected]>

* Address CR comments

Signed-off-by: Felix Wang <[email protected]>

* Fix Go references to ODFV proto

Signed-off-by: Felix Wang <[email protected]>

* Fix tests

Signed-off-by: Felix Wang <[email protected]>

* Fix Java

Signed-off-by: Felix Wang <[email protected]>

* Fix Java again

Signed-off-by: Felix Wang <[email protected]>

* Fix Python integration tests

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Mar 29, 2022
1 parent 6c55e49 commit c4adf80
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 122 deletions.
32 changes: 16 additions & 16 deletions go/internal/feast/ondemandfeatureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@ import (

type OnDemandFeatureView struct {
base *BaseFeatureView
inputFeatureViewProjections map[string]*FeatureViewProjection
inputRequestDataSources map[string]*core.DataSource_RequestDataOptions
sourceFeatureViewProjections map[string]*FeatureViewProjection
sourceRequestDataSources map[string]*core.DataSource_RequestDataOptions
}

func NewOnDemandFeatureViewFromProto(proto *core.OnDemandFeatureView) *OnDemandFeatureView {
onDemandFeatureView := &OnDemandFeatureView{base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
inputFeatureViewProjections: make(map[string]*FeatureViewProjection),
inputRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions),
sourceFeatureViewProjections: make(map[string]*FeatureViewProjection),
sourceRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions),
}
for inputName, onDemandInput := range proto.Spec.Inputs {
if onDemandInputFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureView); ok {
featureViewProto := onDemandInputFeatureView.FeatureView
for sourceName, onDemandSource := range proto.Spec.Sources {
if onDemandSourceFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureView); ok {
featureViewProto := onDemandSourceFeatureView.FeatureView
featureView := NewFeatureViewFromProto(featureViewProto)
onDemandFeatureView.inputFeatureViewProjections[inputName] = featureView.base.projection
} else if onDemandInputFeatureViewProjection, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureViewProjection); ok {
featureProjectionProto := onDemandInputFeatureViewProjection.FeatureViewProjection
onDemandFeatureView.inputFeatureViewProjections[inputName] = NewFeatureViewProjectionFromProto(featureProjectionProto)
} else if onDemandInputRequestFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_RequestDataSource); ok {

if dataSourceRequestOptions, ok := onDemandInputRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok {
onDemandFeatureView.inputRequestDataSources[inputName] = dataSourceRequestOptions.RequestDataOptions
onDemandFeatureView.sourceFeatureViewProjections[sourceName] = featureView.base.projection
} else if onDemandSourceFeatureViewProjection, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureViewProjection); ok {
featureProjectionProto := onDemandSourceFeatureViewProjection.FeatureViewProjection
onDemandFeatureView.sourceFeatureViewProjections[sourceName] = NewFeatureViewProjectionFromProto(featureProjectionProto)
} else if onDemandSourceRequestFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_RequestDataSource); ok {

if dataSourceRequestOptions, ok := onDemandSourceRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok {
onDemandFeatureView.sourceRequestDataSources[sourceName] = dataSourceRequestOptions.RequestDataOptions
}
}
}
Expand All @@ -43,7 +43,7 @@ func (fs *OnDemandFeatureView) NewOnDemandFeatureViewFromBase(base *BaseFeatureV

func (fs *OnDemandFeatureView) getRequestDataSchema() map[string]types.ValueType_Enum {
schema := make(map[string]types.ValueType_Enum)
for _, requestDataSource := range fs.inputRequestDataSources {
for _, requestDataSource := range fs.sourceRequestDataSources {
for fieldName, fieldValueType := range requestDataSource.Schema {
schema[fieldName] = fieldValueType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
.collect(Collectors.toList());

// ToDo (pyalex): refactor transformation service to delete unused left part of the returned
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureInputs.
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureSources.
// Currently, we can retrieve context variables directly from GetOnlineFeaturesRequest.
List<FeatureReferenceV2> onDemandFeatureInputs =
List<FeatureReferenceV2> onDemandFeatureSources =
this.onlineTransformationService.extractOnDemandFeaturesDependencies(
onDemandFeatureReferences);

// Add on demand feature inputs to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureInput : onDemandFeatureInputs) {
if (!retrievedFeatureReferences.contains(onDemandFeatureInput)) {
retrievedFeatureReferences.add(onDemandFeatureInput);
// Add on demand feature sources to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureSource : onDemandFeatureSources) {
if (!retrievedFeatureReferences.contains(onDemandFeatureSource)) {
retrievedFeatureReferences.add(onDemandFeatureSource);
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
// data.
this.populateOnDemandFeatures(
onDemandFeatureReferences,
onDemandFeatureInputs,
onDemandFeatureSources,
retrievedFeatureReferences,
request,
features,
Expand Down Expand Up @@ -257,7 +257,7 @@ private List<Map<String, ValueProto.Value>> getEntityRows(

private void populateOnDemandFeatures(
List<FeatureReferenceV2> onDemandFeatureReferences,
List<FeatureReferenceV2> onDemandFeatureInputs,
List<FeatureReferenceV2> onDemandFeatureSources,
List<FeatureReferenceV2> retrievedFeatureReferences,
ServingAPIProto.GetOnlineFeaturesRequest request,
List<List<feast.storage.api.retriever.Feature>> features,
Expand All @@ -271,7 +271,7 @@ private void populateOnDemandFeatures(
for (int featureIdx = 0; featureIdx < retrievedFeatureReferences.size(); featureIdx++) {
FeatureReferenceV2 featureReference = retrievedFeatureReferences.get(featureIdx);

if (!onDemandFeatureInputs.contains(featureReference)) {
if (!onDemandFeatureSources.contains(featureReference)) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,55 +83,55 @@ public TransformFeaturesResponse transformFeatures(
@Override
public List<ServingAPIProto.FeatureReferenceV2> extractOnDemandFeaturesDependencies(
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureReferences) {
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureInputs = new ArrayList<>();
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureSources = new ArrayList<>();
for (ServingAPIProto.FeatureReferenceV2 featureReference : onDemandFeatureReferences) {
OnDemandFeatureViewProto.OnDemandFeatureViewSpec onDemandFeatureViewSpec =
this.registryRepository.getOnDemandFeatureViewSpec(featureReference);
Map<String, OnDemandFeatureViewProto.OnDemandInput> inputs =
onDemandFeatureViewSpec.getInputsMap();
Map<String, OnDemandFeatureViewProto.OnDemandSource> sources =
onDemandFeatureViewSpec.getSourcesMap();

for (OnDemandFeatureViewProto.OnDemandInput input : inputs.values()) {
OnDemandFeatureViewProto.OnDemandInput.InputCase inputCase = input.getInputCase();
switch (inputCase) {
for (OnDemandFeatureViewProto.OnDemandSource source : sources.values()) {
OnDemandFeatureViewProto.OnDemandSource.SourceCase sourceCase = source.getSourceCase();
switch (sourceCase) {
case REQUEST_DATA_SOURCE:
// Do nothing. The value should be provided as dedicated request parameter
break;
case FEATURE_VIEW_PROJECTION:
FeatureReferenceProto.FeatureViewProjection projection =
input.getFeatureViewProjection();
source.getFeatureViewProjection();
for (FeatureProto.FeatureSpecV2 featureSpec : projection.getFeatureColumnsList()) {
String featureName = featureSpec.getName();
ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput =
ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureViewName(projection.getFeatureViewName())
.setFeatureName(featureName)
.build();
onDemandFeatureInputs.add(onDemandFeatureInput);
onDemandFeatureSources.add(onDemandFeatureSource);
}
break;
case FEATURE_VIEW:
FeatureViewProto.FeatureView featureView = input.getFeatureView();
FeatureViewProto.FeatureView featureView = source.getFeatureView();
FeatureViewProto.FeatureViewSpec featureViewSpec = featureView.getSpec();
String featureViewName = featureViewSpec.getName();
for (FeatureProto.FeatureSpecV2 featureSpec : featureViewSpec.getFeaturesList()) {
String featureName = featureSpec.getName();
ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput =
ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureViewName(featureViewName)
.setFeatureName(featureName)
.build();
onDemandFeatureInputs.add(onDemandFeatureInput);
onDemandFeatureSources.add(onDemandFeatureSource);
}
break;
default:
throw Status.INTERNAL
.withDescription(
"OnDemandInput proto input field has an unexpected type: " + inputCase)
"OnDemandSource proto source field has an unexpected type: " + sourceCase)
.asRuntimeException();
}
}
}
return onDemandFeatureInputs;
return onDemandFeatureSources;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -321,8 +321,8 @@ public ValueType serializeValuesIntoArrowIPC(List<Pair<String, List<ValueProto.V
.asRuntimeException();
}
byte[] byteData = out.toByteArray();
ByteString inputData = ByteString.copyFrom(byteData);
ValueType transformationInput = ValueType.newBuilder().setArrowValue(inputData).build();
ByteString sourceData = ByteString.copyFrom(byteData);
ValueType transformationInput = ValueType.newBuilder().setArrowValue(sourceData).build();
return transformationInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public interface TransformationService {
TransformFeaturesResponse transformFeatures(TransformFeaturesRequest transformFeaturesRequest);

/**
* Extract the list of on demand feature inputs from a list of ODFV references.
* Extract the list of on demand feature sources from a list of ODFV references.
*
* @param onDemandFeatureReferences list of ODFV references to be parsed
* @return list of on demand feature inputs
* @return list of on demand feature sources
*/
List<ServingAPIProto.FeatureReferenceV2> extractOnDemandFeaturesDependencies(
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureReferences);
Expand Down
8 changes: 4 additions & 4 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ message OnDemandFeatureViewSpec {
// List of features specifications for each feature defined with this feature view.
repeated FeatureSpecV2 features = 3;

// Map of inputs for this feature view.
map<string, OnDemandInput> inputs = 4;
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;

Expand All @@ -68,8 +68,8 @@ message OnDemandFeatureViewMeta {
google.protobuf.Timestamp last_updated_timestamp = 2;
}

message OnDemandInput {
oneof input {
message OnDemandSource {
oneof source {
FeatureView feature_view = 1;
FeatureViewProjection feature_view_projection = 3;
DataSource request_data_source = 2;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def feature_view_list(ctx: click.Context):
if isinstance(feature_view, FeatureView):
entities.update(feature_view.entities)
elif isinstance(feature_view, OnDemandFeatureView):
for backing_fv in feature_view.input_feature_view_projections.values():
for backing_fv in feature_view.source_feature_view_projections.values():
entities.update(store.get_feature_view(backing_fv.name).entities)
table.append(
[
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def apply(
data_sources_set_to_update.add(rfv.request_data_source)

for odfv in odfvs_to_update:
for v in odfv.input_request_data_sources.values():
for v in odfv.source_request_data_sources.values():
data_sources_set_to_update.add(v)

data_sources_to_update = list(data_sources_set_to_update)
Expand Down Expand Up @@ -1878,7 +1878,7 @@ def _get_feature_views_to_use(
odfv = od_fvs[fv_name].with_projection(copy.copy(projection))
od_fvs_to_use.append(odfv)
# Let's make sure to include an FVs which the ODFV requires Features from.
for projection in odfv.input_feature_view_projections.values():
for projection in odfv.source_feature_view_projections.values():
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
Expand Down Expand Up @@ -2005,7 +2005,7 @@ def _group_feature_refs(
# Let's also add in any FV Feature dependencies here.
for input_fv_projection in on_demand_view_index[
view_name
].input_feature_view_projections.values():
].source_feature_view_projections.values():
for input_feat in input_fv_projection.features:
views_features[input_fv_projection.name].add(input_feat.name)
elif view_name in request_view_index:
Expand Down
Loading

0 comments on commit c4adf80

Please sign in to comment.