Skip to content

Commit

Permalink
Apply grpc tracing interceptor on online serving (#1242)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Dec 23, 2020
1 parent 17edb99 commit 04d2b47
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 103 deletions.
13 changes: 9 additions & 4 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,26 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<!--compile 'io.jaegertracing:jaeger-client:0.31.0'-->
<!--compile 'io.jaegertracing:jaeger-client:1.3.2'-->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>0.31.0</version>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>0.31.0</version>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-noop</artifactId>
<version>0.31.0</version>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-grpc</artifactId>
<version>0.2.3</version>
</dependency>

<!-- The client -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.serving.config;

import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingServerInterceptor;
import io.opentracing.noop.NoopTracerFactory;
import io.prometheus.client.exporter.MetricsServlet;
import io.prometheus.client.hotspot.DefaultExports;
Expand Down Expand Up @@ -54,4 +55,9 @@ public Tracer tracer() {
return io.jaegertracing.Configuration.fromEnv(feastProperties.getTracing().getServiceName())
.getTracer();
}

@Bean
public TracingServerInterceptor tracingInterceptor(Tracer tracer) {
return TracingServerInterceptor.newBuilder().withTracer(tracer).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@
import feast.serving.util.RequestHelper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingServerInterceptor;
import net.devh.boot.grpc.server.service.GrpcService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.SecurityContextHolder;

@GrpcService(interceptors = {GrpcMessageInterceptor.class, GrpcMonitoringInterceptor.class})
@GrpcService(
interceptors = {
TracingServerInterceptor.class,
GrpcMessageInterceptor.class,
GrpcMonitoringInterceptor.class
})
public class ServingServiceGRpcController extends ServingServiceImplBase {

private static final Logger log =
Expand Down Expand Up @@ -75,16 +80,19 @@ public void getFeastServingInfo(
public void getOnlineFeaturesV2(
ServingAPIProto.GetOnlineFeaturesRequestV2 request,
StreamObserver<GetOnlineFeaturesResponse> responseObserver) {
Span span = tracer.buildSpan("getOnlineFeaturesV2").start();
try (Scope scope = tracer.scopeManager().activate(span, false)) {
try {
// authorize for the project in request object.
if (request.getProject() != null && !request.getProject().isEmpty()) {
// project set at root level overrides the project set at feature table level
this.authorizationService.authorizeRequest(
SecurityContextHolder.getContext(), request.getProject());
}
RequestHelper.validateOnlineRequest(request);
Span span = tracer.buildSpan("getOnlineFeaturesV2").start();
GetOnlineFeaturesResponse onlineFeatures = servingServiceV2.getOnlineFeatures(request);
if (span != null) {
span.finish();
}
responseObserver.onNext(onlineFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
Expand All @@ -102,6 +110,5 @@ public void getOnlineFeaturesV2(
log.warn("Failed to get Online Features", e);
responseObserver.onError(e);
}
span.finish();
}
}
194 changes: 100 additions & 94 deletions serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import feast.storage.api.retriever.Feature;
import feast.storage.api.retriever.OnlineRetrieverV2;
import io.grpc.Status;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,105 +71,111 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
projectName = "default";
}

try (Scope scope = tracer.buildSpan("getOnlineFeaturesV2").startActive(true)) {
List<GetOnlineFeaturesRequestV2.EntityRow> entityRows = request.getEntityRowsList();
// Collect the feature/entity value for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, ValueProto.Value>> entityValuesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));
// Collect the feature/entity status metadata for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, GetOnlineFeaturesResponse.FieldStatus>>
entityStatusesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));

entityRows.forEach(
entityRow -> {
Map<String, ValueProto.Value> valueMap = entityRow.getFieldsMap();
entityValuesMap.get(entityRow).putAll(valueMap);
entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false));
});

List<List<Optional<Feature>>> entityRowsFeatures =
retriever.getOnlineFeatures(projectName, entityRows, featureReferences);

if (entityRowsFeatures.size() != entityRows.size()) {
throw Status.INTERNAL
.withDescription(
"The no. of FeatureRow obtained from OnlineRetriever"
+ "does not match no. of entityRow passed.")
.asRuntimeException();
}
List<GetOnlineFeaturesRequestV2.EntityRow> entityRows = request.getEntityRowsList();
// Collect the feature/entity value for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, ValueProto.Value>> entityValuesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));
// Collect the feature/entity status metadata for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, GetOnlineFeaturesResponse.FieldStatus>>
entityStatusesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));

entityRows.forEach(
entityRow -> {
Map<String, ValueProto.Value> valueMap = entityRow.getFieldsMap();
entityValuesMap.get(entityRow).putAll(valueMap);
entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false));
});

for (int i = 0; i < entityRows.size(); i++) {
GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i);
List<Optional<Feature>> curEntityRowFeatures = entityRowsFeatures.get(i);

Map<FeatureReferenceV2, Optional<Feature>> featureReferenceFeatureMap =
getFeatureRefFeatureMap(curEntityRowFeatures);

Map<String, ValueProto.Value> allValueMaps = new HashMap<>();
Map<String, GetOnlineFeaturesResponse.FieldStatus> allStatusMaps = new HashMap<>();

for (FeatureReferenceV2 featureReference : featureReferences) {
if (featureReferenceFeatureMap.containsKey(featureReference)) {
Optional<Feature> feature = featureReferenceFeatureMap.get(featureReference);

FeatureTableSpec featureTableSpec =
specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference());
FeatureProto.FeatureSpecV2 featureSpec =
specService.getFeatureSpec(projectName, feature.get().getFeatureReference());
ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType();
ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase();
boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase);

boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature);
Map<String, ValueProto.Value> valueMap =
unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec);
allValueMaps.putAll(valueMap);

// Generate metadata for feature values and merge into entityFieldsMap
Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
} else {
Map<String, ValueProto.Value> valueMap =
new HashMap<>() {
{
put(
FeatureV2.getFeatureStringRef(featureReference),
ValueProto.Value.newBuilder().build());
}
};
allValueMaps.putAll(valueMap);
Span onlineRetrievalSpan = tracer.buildSpan("onlineRetrieval").start();
if (onlineRetrievalSpan != null) {
onlineRetrievalSpan.setTag("entities", entityRows.size());
onlineRetrievalSpan.setTag("features", featureReferences.size());
}
List<List<Optional<Feature>>> entityRowsFeatures =
retriever.getOnlineFeatures(projectName, entityRows, featureReferences);
if (onlineRetrievalSpan != null) {
onlineRetrievalSpan.finish();
}

Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, true, false);
allStatusMaps.putAll(statusMap);
if (entityRowsFeatures.size() != entityRows.size()) {
throw Status.INTERNAL
.withDescription(
"The no. of FeatureRow obtained from OnlineRetriever"
+ "does not match no. of entityRow passed.")
.asRuntimeException();
}

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
}
for (int i = 0; i < entityRows.size(); i++) {
GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i);
List<Optional<Feature>> curEntityRowFeatures = entityRowsFeatures.get(i);

Map<FeatureReferenceV2, Optional<Feature>> featureReferenceFeatureMap =
getFeatureRefFeatureMap(curEntityRowFeatures);

Map<String, ValueProto.Value> allValueMaps = new HashMap<>();
Map<String, GetOnlineFeaturesResponse.FieldStatus> allStatusMaps = new HashMap<>();

for (FeatureReferenceV2 featureReference : featureReferences) {
if (featureReferenceFeatureMap.containsKey(featureReference)) {
Optional<Feature> feature = featureReferenceFeatureMap.get(featureReference);

FeatureTableSpec featureTableSpec =
specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference());
FeatureProto.FeatureSpecV2 featureSpec =
specService.getFeatureSpec(projectName, feature.get().getFeatureReference());
ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType();
ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase();
boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase);

boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature);
Map<String, ValueProto.Value> valueMap =
unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec);
allValueMaps.putAll(valueMap);

// Generate metadata for feature values and merge into entityFieldsMap
Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
} else {
Map<String, ValueProto.Value> valueMap =
new HashMap<>() {
{
put(
FeatureV2.getFeatureStringRef(featureReference),
ValueProto.Value.newBuilder().build());
}
};
allValueMaps.putAll(valueMap);

Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, true, false);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
}
entityValuesMap.get(entityRow).putAll(allValueMaps);
entityStatusesMap.get(entityRow).putAll(allStatusMaps);
}

// Build response field values from entityValuesMap and entityStatusesMap
// Response field values should be in the same order as the entityRows provided by the user.
List<GetOnlineFeaturesResponse.FieldValues> fieldValuesList =
entityRows.stream()
.map(
entityRow -> {
return GetOnlineFeaturesResponse.FieldValues.newBuilder()
.putAllFields(entityValuesMap.get(entityRow))
.putAllStatuses(entityStatusesMap.get(entityRow))
.build();
})
.collect(Collectors.toList());
return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build();
entityValuesMap.get(entityRow).putAll(allValueMaps);
entityStatusesMap.get(entityRow).putAll(allStatusMaps);
}

// Build response field values from entityValuesMap and entityStatusesMap
// Response field values should be in the same order as the entityRows provided by the user.
List<GetOnlineFeaturesResponse.FieldValues> fieldValuesList =
entityRows.stream()
.map(
entityRow -> {
return GetOnlineFeaturesResponse.FieldValues.newBuilder()
.putAllFields(entityValuesMap.get(entityRow))
.putAllStatuses(entityStatusesMap.get(entityRow))
.build();
})
.collect(Collectors.toList());
return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build();
}

private boolean checkSameFeatureSpec(
Expand Down

0 comments on commit 04d2b47

Please sign in to comment.