Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya committed Jul 1, 2022
1 parent 47d8125 commit 6e713a0
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 75 deletions.
11 changes: 3 additions & 8 deletions api/src/main/java/io/grpc/GlobalInterceptors.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,19 @@ static synchronized void setInterceptorsTracers(
isGlobalInterceptorsTracersSet = true;
}

/**
* Returns the list of global {@link ClientInterceptor}. If not set, this returns am empty list.
*/
/** Returns the list of global {@link ClientInterceptor}. If not set, this returns null. */
static synchronized List<ClientInterceptor> getClientInterceptors() {
isGlobalInterceptorsTracersGet = true;
return clientInterceptors;
}

/** Returns list of global {@link ServerInterceptor}. If not set, this returns an empty list. */
/** Returns list of global {@link ServerInterceptor}. If not set, this returns null. */
static synchronized List<ServerInterceptor> getServerInterceptors() {
isGlobalInterceptorsTracersGet = true;
return serverInterceptors;
}

/**
* Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns an empty
* list.
*/
/** Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns null. */
static synchronized List<ServerStreamTracer.Factory> getServerStreamTracerFactories() {
isGlobalInterceptorsTracersGet = true;
return serverStreamTracerFactories;
Expand Down
1 change: 1 addition & 0 deletions gcp-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
project(':grpc-alts'),
project(':grpc-census'),
("com.google.cloud:google-cloud-logging:${cloudLoggingVersion}"),
libraries.opencensus.contrib.grpc.metrics,
libraries.opencensus.exporter.stats.stackdriver,
libraries.opencensus.exporter.trace.stackdriver,
libraries.animalsniffer.annotations, // Prefer our version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.Strings;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
Expand Down Expand Up @@ -69,22 +68,16 @@ public static synchronized GcpObservability grpcInit() throws IOException {
if (instance == null) {
GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink =
new GcpLogSink(
observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(),
globalLoggingTags.getCustomTags(),
observabilityConfig.getFlushMessageCount());
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance =
grpcInit(
sink,
observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId());
}
return instance;
Expand Down Expand Up @@ -142,27 +135,19 @@ private void setProducer(
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
}

if (!clientInterceptors.isEmpty()
|| !serverInterceptors.isEmpty()
|| !tracerFactories.isEmpty()) {
InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}
InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}

private void registerStackDriverExporter(String projectId) throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
StackdriverStatsConfiguration.Builder statsConfigurationBuilder =
StackdriverStatsConfiguration.builder();
if (!Strings.isNullOrEmpty(projectId)) {
if (projectId != null) {
statsConfigurationBuilder.setProjectId(projectId);
}
try {
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
} catch (IOException e) {
throw new IOException("Failed to register Stackdriver stats exporter, " + e.getMessage());
}
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}

Expand All @@ -172,14 +157,10 @@ private void registerStackDriverExporter(String projectId) throws IOException {
traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build());
StackdriverTraceConfiguration.Builder traceConfigurationBuilder =
StackdriverTraceConfiguration.builder();
if (!Strings.isNullOrEmpty(projectId)) {
if (projectId != null) {
traceConfigurationBuilder.setProjectId(projectId);
}
try {
StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build());
} catch (IOException e) {
throw new IOException("Failed to register Stackdriver trace exporter, " + e.getMessage());
}
StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build());
tracesEnabled = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ private void parseConfig(Map<String, ?> config) {
if (enableCloudTracing && samplingRate == null) {
this.sampler = Samplers.probabilitySampler(0.0);
}
double epsilon = 1e-6;
if (samplingRate != null) {
checkArgument(
samplingRate >= 0.0 && samplingRate <= 1.0,
"'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
if (samplingRate == 1.0) {
// Using alwaysSample() instead of probabilitySampler() because according to
// {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
// there is a (very) small chance of *not* sampling if probability = 1.00.
if (Math.abs(1 - samplingRate) < epsilon) {
this.sampler = Samplers.alwaysSample();
} else {
this.sampler = Samplers.probabilitySampler(samplingRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.InternalGlobalInterceptors;
// import io.grpc.ManagedChannelProvider;
import io.grpc.ManagedChannelProvider;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
// import io.grpc.ServerProvider;
import io.grpc.ServerProvider;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.logging.Sink;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.util.regex.Pattern;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -48,51 +51,77 @@
@RunWith(JUnit4.class)
public class GcpObservabilityTest {

private static final String PROJECT_ID = "project";
private final StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(
getClass().getClassLoader(),
Pattern.compile(
"io\\.grpc\\.InternalGlobalInterceptors|io\\.grpc\\.GlobalInterceptors|"
+ "io\\.grpc\\.gcp\\.observability\\.[^.]+|"
+ "io\\.grpc\\.gcp\\.observability\\.interceptors\\.[^.]+|"
+ "io\\.grpc\\.gcp\\.observability\\.GcpObservabilityTest\\$.*"));

@Test
public void initFinish() throws Exception {
ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider();
ServerProvider prevServerProvider = ServerProvider.provider();
Sink sink = mock(Sink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
mock(InternalLoggingChannelInterceptor.Factory.class);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
mock(InternalLoggingServerInterceptor.Factory.class);
GcpObservability observability1;
try (GcpObservability observability =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class);
observability1 =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory);
assertThat(observability1).isSameInstanceAs(observability);
}
verify(sink).close();
assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider);
assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider);
try {
observability1.close();
fail("should have failed for calling close() second time");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("GcpObservability already closed!");
}
Class<?> runnable =
classLoader.loadClass(StaticTestingClassInitFinish.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}

@Test
public void enableObservability() throws Exception {
StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(
getClass().getClassLoader(), Pattern.compile("io\\.grpc\\.[^.]+"));
Class<?> runnable = classLoader.loadClass(StaticTestingClassLoaderSet.class.getName());
Class<?> runnable =
classLoader.loadClass(StaticTestingClassEnableObservability.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}

@Test
public void disableObservability() throws Exception {
Class<?> runnable =
classLoader.loadClass(StaticTestingClassDisableObservability.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}

// UsedReflectively
public static final class StaticTestingClassLoaderSet implements Runnable {
public static final class StaticTestingClassInitFinish implements Runnable {

@Override
public void run() {
ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider();
ServerProvider prevServerProvider = ServerProvider.provider();
Sink sink = mock(Sink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
mock(InternalLoggingChannelInterceptor.Factory.class);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
mock(InternalLoggingServerInterceptor.Factory.class);
GcpObservability observability1;
try {
GcpObservability observability =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory);
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class);
observability1 =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory);
assertThat(observability1).isSameInstanceAs(observability);
observability.close();
verify(sink).close();
assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider);
assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider);
try {
observability1.close();
fail("should have failed for calling close() second time");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("GcpObservability already closed!");
}
} catch (IOException e) {
fail("Encountered exception: " + e);
}
}
}

public static final class StaticTestingClassEnableObservability implements Runnable {

@Override
public void run() {
Expand All @@ -102,7 +131,6 @@ public void run() {
when(config.isEnableCloudMonitoring()).thenReturn(true);
when(config.isEnableCloudTracing()).thenReturn(true);
when(config.getSampler()).thenReturn(Samplers.neverSample());
when(config.getDestinationProjectId()).thenReturn(PROJECT_ID);

ClientInterceptor clientInterceptor =
mock(ClientInterceptor.class, delegatesTo(new NoopClientInterceptor()));
Expand All @@ -128,6 +156,35 @@ public void run() {
}
}

public static final class StaticTestingClassDisableObservability implements Runnable {

@Override
public void run() {
Sink sink = mock(Sink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
when(config.isEnableCloudLogging()).thenReturn(false);
when(config.isEnableCloudMonitoring()).thenReturn(false);
when(config.isEnableCloudTracing()).thenReturn(false);
when(config.getSampler()).thenReturn(Samplers.neverSample());

InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
mock(InternalLoggingChannelInterceptor.Factory.class);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
mock(InternalLoggingServerInterceptor.Factory.class);;

try (GcpObservability observability =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
assertThat(InternalGlobalInterceptors.getClientInterceptors()).isEmpty();
assertThat(InternalGlobalInterceptors.getServerInterceptors()).isEmpty();
assertThat(InternalGlobalInterceptors.getServerStreamTracerFactories()).isEmpty();
} catch (Exception e) {
fail("Encountered exception: " + e);
}
verify(sink).close();
}
}

private static class NoopClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
Expand Down

0 comments on commit 6e713a0

Please sign in to comment.