Skip to content

Commit

Permalink
gcp-observability: Populate global interceptors from observability (g…
Browse files Browse the repository at this point in the history
…rpc#9309)

* Populate global interceptors from observability and added stackdriver exporters
  • Loading branch information
DNVindhya authored Jul 14, 2022
1 parent 49f5551 commit ef89bd3
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 119 deletions.
11 changes: 3 additions & 8 deletions api/src/main/java/io/grpc/GlobalInterceptors.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,19 @@ static synchronized void setInterceptorsTracers(
isGlobalInterceptorsTracersSet = true;
}

/**
* Returns the list of global {@link ClientInterceptor}. If not set, this returns am empty list.
*/
/** Returns the list of global {@link ClientInterceptor}. If not set, this returns null. */
static synchronized List<ClientInterceptor> getClientInterceptors() {
isGlobalInterceptorsTracersGet = true;
return clientInterceptors;
}

/** Returns list of global {@link ServerInterceptor}. If not set, this returns an empty list. */
/** Returns list of global {@link ServerInterceptor}. If not set, this returns null. */
static synchronized List<ServerInterceptor> getServerInterceptors() {
isGlobalInterceptorsTracersGet = true;
return serverInterceptors;
}

/**
* Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns an empty
* list.
*/
/** Returns list of global {@link ServerStreamTracer.Factory}. If not set, this returns null. */
static synchronized List<ServerStreamTracer.Factory> getServerStreamTracerFactories() {
isGlobalInterceptorsTracersGet = true;
return serverStreamTracerFactories;
Expand Down
4 changes: 3 additions & 1 deletion gcp-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
project(':grpc-alts'),
project(':grpc-census'),
("com.google.cloud:google-cloud-logging:${cloudLoggingVersion}"),
libraries.opencensus.contrib.grpc.metrics,
libraries.opencensus.exporter.stats.stackdriver,
libraries.opencensus.exporter.trace.stackdriver,
libraries.animalsniffer.annotations, // Prefer our version
Expand All @@ -41,7 +42,8 @@ dependencies {

runtimeOnly libraries.opencensus.impl

testImplementation project(':grpc-testing'),
testImplementation project(':grpc-context').sourceSets.test.output,
project(':grpc-testing'),
project(':grpc-testing-proto'),
project(':grpc-netty-shaded')
testImplementation (libraries.guava.testlib) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,45 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
import io.grpc.InternalGlobalInterceptors;
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.ServerInterceptor;
import io.grpc.ServerStreamTracer;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.InternalCensusTracingAccessor;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.config.TraceConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;

/** The main class for gRPC Google Cloud Platform Observability features. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
private final ArrayList<ClientInterceptor> clientInterceptors = new ArrayList<>();
private final ArrayList<ServerInterceptor> serverInterceptors = new ArrayList<>();
private final ArrayList<ServerStreamTracer.Factory> tracerFactories = new ArrayList<>();
private boolean metricsEnabled;
private boolean tracesEnabled;

/**
* Initialize grpc-observability.
Expand All @@ -48,20 +71,33 @@ public static synchronized GcpObservability grpcInit() throws IOException {
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink,
instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId());
}
return instance;
}

@VisibleForTesting static GcpObservability grpcInit(Sink sink,
@VisibleForTesting
static GcpObservability grpcInit(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
throws IOException {
if (instance == null) {
instance = new GcpObservability(sink, channelInterceptorFactory, serverInterceptorFactory);
instance =
new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory);
LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config);
instance.setProducer(
new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper));
}
return instance;
}
Expand All @@ -73,17 +109,92 @@ public void close() {
if (instance == null) {
throw new IllegalStateException("GcpObservability already closed!");
}
unRegisterStackDriverExporter();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
}

private GcpObservability(Sink sink,
private void setProducer(
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
if (config.isEnableCloudLogging()) {
clientInterceptors.add(channelInterceptorFactory.create());
serverInterceptors.add(serverInterceptorFactory.create());
}
if (config.isEnableCloudMonitoring()) {
clientInterceptors.add(
InternalCensusStatsAccessor.getClientInterceptor(true, true, true, true));
tracerFactories.add(
InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, true));
}
if (config.isEnableCloudTracing()) {
clientInterceptors.add(InternalCensusTracingAccessor.getClientInterceptor());
tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
}

InternalGlobalInterceptors.setInterceptorsTracers(
clientInterceptors, serverInterceptors, tracerFactories);
}

private void registerStackDriverExporter(String projectId) throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
StackdriverStatsConfiguration.Builder statsConfigurationBuilder =
StackdriverStatsConfiguration.builder();
if (projectId != null) {
statsConfigurationBuilder.setProjectId(projectId);
}
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}

if (config.isEnableCloudTracing()) {
TraceConfig traceConfig = Tracing.getTraceConfig();
traceConfig.updateActiveTraceParams(
traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build());
StackdriverTraceConfiguration.Builder traceConfigurationBuilder =
StackdriverTraceConfiguration.builder();
if (projectId != null) {
traceConfigurationBuilder.setProjectId(projectId);
}
StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build());
tracesEnabled = true;
}
}

private void unRegisterStackDriverExporter() {
if (metricsEnabled) {
try {
StackdriverStatsExporter.unregister();
} catch (IllegalStateException e) {
logger.log(
Level.SEVERE, "Failed to unregister Stackdriver stats exporter, " + e.getMessage());
}
metricsEnabled = false;
}

if (tracesEnabled) {
try {
StackdriverTraceExporter.unregister();
} catch (IllegalStateException e) {
logger.log(
Level.SEVERE, "Failed to unregister Stackdriver trace exporter, " + e.getMessage());
}
tracesEnabled = false;
}
}

private GcpObservability(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
this.sink = checkNotNull(sink);
this.config = checkNotNull(config);

LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.grpc.Internal;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import java.util.List;

@Internal
Expand All @@ -43,6 +44,7 @@ public interface ObservabilityConfig {
/** Get event types to log. */
List<EventType> getEventTypes();

/** Get sampler for TraceConfig - when Cloud Tracing is enabled. */
Sampler getSampler();

/**
Expand Down Expand Up @@ -71,34 +73,4 @@ public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) {
this.messageBytes = messageBytes;
}
}

/** Corresponds to a {@link io.opencensus.trace.Sampler} type. */
enum SamplerType {
ALWAYS,
NEVER,
PROBABILISTIC;
}

/** Represents a trace {@link io.opencensus.trace.Sampler} configuration. */
class Sampler {
private SamplerType type;
private double probability;

Sampler(double probability) {
this.probability = probability;
this.type = SamplerType.PROBABILISTIC;
}

Sampler(SamplerType type) {
this.type = type;
}

double getProbability() {
return probability;
}

SamplerType getType() {
return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -35,6 +37,8 @@
final class ObservabilityConfigImpl implements ObservabilityConfig {
private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY";
private static final String CONFIG_FILE_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY_JSON";
// Tolerance for floating-point comparisons.
private static final double EPSILON = 1e-6;

private boolean enableCloudLogging = false;
private boolean enableCloudMonitoring = false;
Expand Down Expand Up @@ -100,19 +104,21 @@ private void parseConfig(Map<String, ?> config) {
}
this.eventTypes = eventTypesBuilder.build();
}
String sampler = JsonUtil.getString(config, "global_trace_sampler");
Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate");
checkArgument(
sampler == null || samplingRate == null,
"only one of 'global_trace_sampler' or 'global_trace_sampling_rate' can be specified");
if (sampler != null) {
this.sampler = new Sampler(SamplerType.valueOf(sampler.toUpperCase()));
}
if (samplingRate != null) {
if (samplingRate == null) {
this.sampler = Samplers.probabilitySampler(0.0);
} else {
checkArgument(
samplingRate >= 0.0 && samplingRate <= 1.0,
"'global_trace_sampling_rate' needs to be between 0.0 and 1.0");
this.sampler = new Sampler(samplingRate);
"'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
// Using alwaysSample() instead of probabilitySampler() because according to
// {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
// there is a (very) small chance of *not* sampling if probability = 1.00.
if (1 - samplingRate < EPSILON) {
this.sampler = Samplers.alwaysSample();
} else {
this.sampler = Samplers.probabilitySampler(samplingRate);
}
}
}
}
Expand Down
Loading

0 comments on commit ef89bd3

Please sign in to comment.