Skip to content

Commit

Permalink
gcp-observability: implement exclusion of cloud backend RPCs for all …
Browse files Browse the repository at this point in the history
…3 signals (#9427) (#9436)

* gcp-observability: implement exclusion of cloud backend RPCs for all 3 signals
by using a ConditionalClientInterceptor that conditionally delegates
  • Loading branch information
sanjaypujare authored Aug 10, 2022
1 parent 7bdca0c commit db3dd01
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
import io.grpc.InternalGlobalInterceptors;
Expand All @@ -27,6 +28,7 @@
import io.grpc.ServerStreamTracer;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.InternalCensusTracingAccessor;
import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
Expand Down Expand Up @@ -57,6 +59,9 @@
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static final int METRICS_EXPORT_INTERVAL = 30;
private static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
"google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
"google.devtools.cloudtrace.v2.TraceService");
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
Expand All @@ -77,7 +82,7 @@ public static synchronized GcpObservability grpcInit() throws IOException {
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount());
observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
Expand Down Expand Up @@ -126,20 +131,26 @@ private void setProducer(
serverInterceptors.add(serverInterceptorFactory.create());
}
if (config.isEnableCloudMonitoring()) {
clientInterceptors.add(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true));
clientInterceptors.add(getConditionalInterceptor(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true)));
tracerFactories.add(
InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true));
}
if (config.isEnableCloudTracing()) {
clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor());
clientInterceptors.add(
getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor()));
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
}

InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}

static ConditionalClientInterceptor getConditionalInterceptor(ClientInterceptor interceptor) {
return new ConditionalClientInterceptor(interceptor,
(m, c) -> !SERVICES_TO_EXCLUDE.contains(m.getServiceName()));
}

@VisibleForTesting
void registerStackDriverExporter(String projectId, Map<String, String> customTags)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.gcp.observability.interceptors;

import static com.google.common.base.Preconditions.checkNotNull;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Internal;
import io.grpc.MethodDescriptor;
import java.util.function.BiPredicate;

/**
* A client interceptor that conditionally calls a delegated interceptor.
*/
@Internal
public final class ConditionalClientInterceptor implements ClientInterceptor {

private final ClientInterceptor delegate;
private final BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate;

public ConditionalClientInterceptor(ClientInterceptor delegate,
BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate) {
this.delegate = checkNotNull(delegate, "delegate");
this.predicate = checkNotNull(predicate, "predicate");
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
if (!predicate.test(method, callOptions)) {
return next.newCall(method, callOptions);
}
return delegate.interceptCall(method, callOptions, next);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.gcp.observability.logging;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
Expand All @@ -31,6 +33,7 @@
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand All @@ -45,8 +48,6 @@
public class GcpLogSink implements Sink {
private final Logger logger = Logger.getLogger(GcpLogSink.class.getName());

// TODO(DNVindhya): Make cloud logging service a configurable value
private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2";
private static final String DEFAULT_LOG_NAME =
"microservices.googleapis.com%2Fobservability%2Fgrpc";
private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container";
Expand All @@ -62,26 +63,29 @@ public class GcpLogSink implements Sink {
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private long flushCounter;
private final Collection<String> servicesToExclude;

@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(destinationProjectId, locationTags, customTags, flushLimit);
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) {
this(destinationProjectId, locationTags, customTags, flushLimit, servicesToExclude);
this.gcpLoggingClient = loggingClient;
}

/**
* Retrieves a single instance of GcpLogSink.
*
* @param destinationProjectId cloud project id to write logs
* @param servicesToExclude service names for which log entries should not be generated
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) {
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0L;
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
}

/**
Expand All @@ -98,7 +102,7 @@ public void write(GrpcLogRecord logProto) {
}
}
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
if (servicesToExclude.contains(logProto.getServiceName())) {
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import io.grpc.CallOptions;
Expand All @@ -34,11 +37,13 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.logging.Sink;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -77,6 +82,61 @@ public void disableObservability() throws Exception {
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}

@Test
@SuppressWarnings("unchecked")
public void conditionalInterceptor() {
ClientInterceptor delegate = mock(ClientInterceptor.class);
Channel channel = mock(Channel.class);
ClientCall<?, ?> returnedCall = mock(ClientCall.class);

ConditionalClientInterceptor conditionalClientInterceptor
= GcpObservability.getConditionalInterceptor(
delegate);
MethodDescriptor<?, ?> method = MethodDescriptor.newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.logging.v2.LoggingServiceV2/method")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.monitoring.v3.MetricService/method2")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.devtools.cloudtrace.v2.TraceService/method3")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(delegate);
assertThat(clientCall).isSameInstanceAs(returnedCall);

reset(channel);
ClientCall<?, ?> interceptedCall = mock(ClientCall.class);
method = MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("some.other.random.service/method4")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
doReturn(interceptedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel);
clientCall = conditionalClientInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
verifyNoInteractions(channel);
assertThat(clientCall).isSameInstanceAs(interceptedCall);
}

// UsedReflectively
public static final class StaticTestingClassInitFinish implements Runnable {

Expand Down Expand Up @@ -137,7 +197,10 @@ public void run() {
try (GcpObservability unused =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
assertThat(InternalGlobalInterceptors.getClientInterceptors()).hasSize(3);
List<ClientInterceptor> list = InternalGlobalInterceptors.getClientInterceptors();
assertThat(list).hasSize(3);
assertThat(list.get(1)).isInstanceOf(ConditionalClientInterceptor.class);
assertThat(list.get(2)).isInstanceOf(ConditionalClientInterceptor.class);
assertThat(InternalGlobalInterceptors.getServerInterceptors()).hasSize(1);
assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).hasSize(2);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.Collections;
import java.util.regex.Pattern;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand Down Expand Up @@ -110,7 +111,9 @@ public static final class StaticTestingClassEndtoEndLogging implements Runnable

@Override
public void run() {
Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT);
Sink sink =
new GcpLogSink(
PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet());
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.gcp.observability.interceptors;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import java.util.function.BiPredicate;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/**
* Tests for {@link ConditionalClientInterceptor}.
*/
@RunWith(JUnit4.class)
public class ConditionalClientInterceptorTest {

private ConditionalClientInterceptor conditionalClientInterceptor;
@Mock private ClientInterceptor delegate;
@Mock private BiPredicate<MethodDescriptor<?, ?>, CallOptions> predicate;
@Mock private Channel channel;
@Mock private ClientCall<?, ?> returnedCall;
private MethodDescriptor<?, ?> method;

@Before
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
conditionalClientInterceptor = new ConditionalClientInterceptor(
delegate, predicate);
method = MethodDescriptor.newBuilder().setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(mock(MethodDescriptor.Marshaller.class))
.setResponseMarshaller(mock(MethodDescriptor.Marshaller.class))
.build();
}

@Test
@SuppressWarnings("unchecked")
public void predicateFalse() {
when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(false);
doReturn(returnedCall).when(channel).newCall(method, CallOptions.DEFAULT);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
assertThat(clientCall).isSameInstanceAs(returnedCall);
verify(delegate, never()).interceptCall(any(MethodDescriptor.class), any(CallOptions.class),
any(Channel.class));
}

@Test
@SuppressWarnings("unchecked")
public void predicateTrue() {
when(predicate.test(any(MethodDescriptor.class), any(CallOptions.class))).thenReturn(true);
doReturn(returnedCall).when(delegate).interceptCall(method, CallOptions.DEFAULT, channel);
ClientCall<?, ?> clientCall = conditionalClientInterceptor.interceptCall(method,
CallOptions.DEFAULT, channel);
assertThat(clientCall).isSameInstanceAs(returnedCall);
verify(channel, never()).newCall(any(MethodDescriptor.class), any(CallOptions.class));
}
}
Loading

0 comments on commit db3dd01

Please sign in to comment.