Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rename inputs parameter to sources for on demand feature views #2442

Merged
merged 7 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions go/internal/feast/ondemandfeatureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@ import (

type OnDemandFeatureView struct {
base *BaseFeatureView
inputFeatureViewProjections map[string]*FeatureViewProjection
inputRequestDataSources map[string]*core.DataSource_RequestDataOptions
sourceFeatureViewProjections map[string]*FeatureViewProjection
sourceRequestDataSources map[string]*core.DataSource_RequestDataOptions
}

func NewOnDemandFeatureViewFromProto(proto *core.OnDemandFeatureView) *OnDemandFeatureView {
onDemandFeatureView := &OnDemandFeatureView{base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
inputFeatureViewProjections: make(map[string]*FeatureViewProjection),
inputRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions),
sourceFeatureViewProjections: make(map[string]*FeatureViewProjection),
sourceRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions),
}
for inputName, onDemandInput := range proto.Spec.Inputs {
if onDemandInputFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureView); ok {
featureViewProto := onDemandInputFeatureView.FeatureView
for sourceName, onDemandSource := range proto.Spec.Sources {
if onDemandSourceFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureView); ok {
featureViewProto := onDemandSourceFeatureView.FeatureView
featureView := NewFeatureViewFromProto(featureViewProto)
onDemandFeatureView.inputFeatureViewProjections[inputName] = featureView.base.projection
} else if onDemandInputFeatureViewProjection, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureViewProjection); ok {
featureProjectionProto := onDemandInputFeatureViewProjection.FeatureViewProjection
onDemandFeatureView.inputFeatureViewProjections[inputName] = NewFeatureViewProjectionFromProto(featureProjectionProto)
} else if onDemandInputRequestFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_RequestDataSource); ok {

if dataSourceRequestOptions, ok := onDemandInputRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok {
onDemandFeatureView.inputRequestDataSources[inputName] = dataSourceRequestOptions.RequestDataOptions
onDemandFeatureView.sourceFeatureViewProjections[sourceName] = featureView.base.projection
} else if onDemandSourceFeatureViewProjection, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureViewProjection); ok {
featureProjectionProto := onDemandSourceFeatureViewProjection.FeatureViewProjection
onDemandFeatureView.sourceFeatureViewProjections[sourceName] = NewFeatureViewProjectionFromProto(featureProjectionProto)
} else if onDemandSourceRequestFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_RequestDataSource); ok {

if dataSourceRequestOptions, ok := onDemandSourceRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok {
onDemandFeatureView.sourceRequestDataSources[sourceName] = dataSourceRequestOptions.RequestDataOptions
}
}
}
Expand All @@ -43,7 +43,7 @@ func (fs *OnDemandFeatureView) NewOnDemandFeatureViewFromBase(base *BaseFeatureV

func (fs *OnDemandFeatureView) getRequestDataSchema() map[string]types.ValueType_Enum {
schema := make(map[string]types.ValueType_Enum)
for _, requestDataSource := range fs.inputRequestDataSources {
for _, requestDataSource := range fs.sourceRequestDataSources {
for fieldName, fieldValueType := range requestDataSource.Schema {
schema[fieldName] = fieldValueType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
.collect(Collectors.toList());

// ToDo (pyalex): refactor transformation service to delete unused left part of the returned
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureInputs.
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureSources.
// Currently, we can retrieve context variables directly from GetOnlineFeaturesRequest.
List<FeatureReferenceV2> onDemandFeatureInputs =
List<FeatureReferenceV2> onDemandFeatureSources =
this.onlineTransformationService.extractOnDemandFeaturesDependencies(
onDemandFeatureReferences);

// Add on demand feature inputs to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureInput : onDemandFeatureInputs) {
if (!retrievedFeatureReferences.contains(onDemandFeatureInput)) {
retrievedFeatureReferences.add(onDemandFeatureInput);
// Add on demand feature sources to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureSource : onDemandFeatureSources) {
if (!retrievedFeatureReferences.contains(onDemandFeatureSource)) {
retrievedFeatureReferences.add(onDemandFeatureSource);
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
// data.
this.populateOnDemandFeatures(
onDemandFeatureReferences,
onDemandFeatureInputs,
onDemandFeatureSources,
retrievedFeatureReferences,
request,
features,
Expand Down Expand Up @@ -257,7 +257,7 @@ private List<Map<String, ValueProto.Value>> getEntityRows(

private void populateOnDemandFeatures(
List<FeatureReferenceV2> onDemandFeatureReferences,
List<FeatureReferenceV2> onDemandFeatureInputs,
List<FeatureReferenceV2> onDemandFeatureSources,
List<FeatureReferenceV2> retrievedFeatureReferences,
ServingAPIProto.GetOnlineFeaturesRequest request,
List<List<feast.storage.api.retriever.Feature>> features,
Expand All @@ -271,7 +271,7 @@ private void populateOnDemandFeatures(
for (int featureIdx = 0; featureIdx < retrievedFeatureReferences.size(); featureIdx++) {
FeatureReferenceV2 featureReference = retrievedFeatureReferences.get(featureIdx);

if (!onDemandFeatureInputs.contains(featureReference)) {
if (!onDemandFeatureSources.contains(featureReference)) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,55 +83,55 @@ public TransformFeaturesResponse transformFeatures(
@Override
public List<ServingAPIProto.FeatureReferenceV2> extractOnDemandFeaturesDependencies(
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureReferences) {
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureInputs = new ArrayList<>();
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureSources = new ArrayList<>();
for (ServingAPIProto.FeatureReferenceV2 featureReference : onDemandFeatureReferences) {
OnDemandFeatureViewProto.OnDemandFeatureViewSpec onDemandFeatureViewSpec =
this.registryRepository.getOnDemandFeatureViewSpec(featureReference);
Map<String, OnDemandFeatureViewProto.OnDemandInput> inputs =
onDemandFeatureViewSpec.getInputsMap();
Map<String, OnDemandFeatureViewProto.OnDemandSource> sources =
onDemandFeatureViewSpec.getSourcesMap();

for (OnDemandFeatureViewProto.OnDemandInput input : inputs.values()) {
OnDemandFeatureViewProto.OnDemandInput.InputCase inputCase = input.getInputCase();
switch (inputCase) {
for (OnDemandFeatureViewProto.OnDemandSource source : sources.values()) {
OnDemandFeatureViewProto.OnDemandSource.SourceCase sourceCase = source.getSourceCase();
switch (sourceCase) {
case REQUEST_DATA_SOURCE:
// Do nothing. The value should be provided as dedicated request parameter
break;
case FEATURE_VIEW_PROJECTION:
FeatureReferenceProto.FeatureViewProjection projection =
input.getFeatureViewProjection();
source.getFeatureViewProjection();
for (FeatureProto.FeatureSpecV2 featureSpec : projection.getFeatureColumnsList()) {
String featureName = featureSpec.getName();
ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput =
ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureViewName(projection.getFeatureViewName())
.setFeatureName(featureName)
.build();
onDemandFeatureInputs.add(onDemandFeatureInput);
onDemandFeatureSources.add(onDemandFeatureSource);
}
break;
case FEATURE_VIEW:
FeatureViewProto.FeatureView featureView = input.getFeatureView();
FeatureViewProto.FeatureView featureView = source.getFeatureView();
FeatureViewProto.FeatureViewSpec featureViewSpec = featureView.getSpec();
String featureViewName = featureViewSpec.getName();
for (FeatureProto.FeatureSpecV2 featureSpec : featureViewSpec.getFeaturesList()) {
String featureName = featureSpec.getName();
ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput =
ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource =
ServingAPIProto.FeatureReferenceV2.newBuilder()
.setFeatureViewName(featureViewName)
.setFeatureName(featureName)
.build();
onDemandFeatureInputs.add(onDemandFeatureInput);
onDemandFeatureSources.add(onDemandFeatureSource);
}
break;
default:
throw Status.INTERNAL
.withDescription(
"OnDemandInput proto input field has an unexpected type: " + inputCase)
"OnDemandSource proto source field has an unexpected type: " + sourceCase)
.asRuntimeException();
}
}
}
return onDemandFeatureInputs;
return onDemandFeatureSources;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -321,8 +321,8 @@ public ValueType serializeValuesIntoArrowIPC(List<Pair<String, List<ValueProto.V
.asRuntimeException();
}
byte[] byteData = out.toByteArray();
ByteString inputData = ByteString.copyFrom(byteData);
ValueType transformationInput = ValueType.newBuilder().setArrowValue(inputData).build();
ByteString sourceData = ByteString.copyFrom(byteData);
ValueType transformationInput = ValueType.newBuilder().setArrowValue(sourceData).build();
return transformationInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public interface TransformationService {
TransformFeaturesResponse transformFeatures(TransformFeaturesRequest transformFeaturesRequest);

/**
* Extract the list of on demand feature inputs from a list of ODFV references.
* Extract the list of on demand feature sources from a list of ODFV references.
*
* @param onDemandFeatureReferences list of ODFV references to be parsed
* @return list of on demand feature inputs
* @return list of on demand feature sources
*/
List<ServingAPIProto.FeatureReferenceV2> extractOnDemandFeaturesDependencies(
List<ServingAPIProto.FeatureReferenceV2> onDemandFeatureReferences);
Expand Down
8 changes: 4 additions & 4 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ message OnDemandFeatureViewSpec {
// List of features specifications for each feature defined with this feature view.
repeated FeatureSpecV2 features = 3;

// Map of inputs for this feature view.
map<string, OnDemandInput> inputs = 4;
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;

Expand All @@ -68,8 +68,8 @@ message OnDemandFeatureViewMeta {
google.protobuf.Timestamp last_updated_timestamp = 2;
}

message OnDemandInput {
oneof input {
message OnDemandSource {
oneof source {
FeatureView feature_view = 1;
FeatureViewProjection feature_view_projection = 3;
DataSource request_data_source = 2;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def feature_view_list(ctx: click.Context):
if isinstance(feature_view, FeatureView):
entities.update(feature_view.entities)
elif isinstance(feature_view, OnDemandFeatureView):
for backing_fv in feature_view.input_feature_view_projections.values():
for backing_fv in feature_view.source_feature_view_projections.values():
entities.update(store.get_feature_view(backing_fv.name).entities)
table.append(
[
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def apply(
data_sources_set_to_update.add(rfv.request_data_source)

for odfv in odfvs_to_update:
for v in odfv.input_request_data_sources.values():
for v in odfv.source_request_data_sources.values():
data_sources_set_to_update.add(v)

data_sources_to_update = list(data_sources_set_to_update)
Expand Down Expand Up @@ -1878,7 +1878,7 @@ def _get_feature_views_to_use(
odfv = od_fvs[fv_name].with_projection(copy.copy(projection))
od_fvs_to_use.append(odfv)
# Let's make sure to include an FVs which the ODFV requires Features from.
for projection in odfv.input_feature_view_projections.values():
for projection in odfv.source_feature_view_projections.values():
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
Expand Down Expand Up @@ -2005,7 +2005,7 @@ def _group_feature_refs(
# Let's also add in any FV Feature dependencies here.
for input_fv_projection in on_demand_view_index[
view_name
].input_feature_view_projections.values():
].source_feature_view_projections.values():
for input_feat in input_fv_projection.features:
views_features[input_fv_projection.name].add(input_feat.name)
elif view_name in request_view_index:
Expand Down
Loading