Skip to content

Commit

Permalink
Allow OTLP gRPC exporters to work without grpc-java using okhttp dire…
Browse files Browse the repository at this point in the history
…ctly. (#3684)

* Allow OTLP HTTP exporter to also export in gRPC format.

* detect classpath

* Revert HTTP exporter

* Apply to metrics / trace

* Most

* Fix

* Integration tests

* Clean

* Fix log message

* Assume validated
  • Loading branch information
anuraaga authored Oct 6, 2021
1 parent 8d7cca7 commit 63bec07
Show file tree
Hide file tree
Showing 45 changed files with 1,878 additions and 717 deletions.
4 changes: 2 additions & 2 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ val DEPENDENCIES = listOf(
// using old version of okhttp to avoid pulling in kotlin stdlib
// not using (old) okhttp bom because that is pulling in old guava version
// and overriding the guava bom
"com.squareup.okhttp3:okhttp:3.12.13",
"com.squareup.okhttp3:okhttp-tls:3.12.13",
"com.squareup.okhttp3:okhttp:3.14.9",
"com.squareup.okhttp3:okhttp-tls:3.14.9",
"com.sun.net.httpserver:http:20070405",
"com.tngtech.archunit:archunit-junit5:0.21.0",
"com.uber.nullaway:nullaway:0.9.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BoundLongCounter;
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link GrpcExporter} which uses the standard grpc-java library.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExporter<T> {

private static final Logger internalLogger =
Logger.getLogger(DefaultGrpcExporter.class.getName());

private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

private final String type;
private final ManagedChannel managedChannel;
private final MarshalerServiceStub<T, ?, ?> stub;
private final long timeoutNanos;

private final BoundLongCounter seen;
private final BoundLongCounter success;
private final BoundLongCounter failed;

/** Creates a new {@link DefaultGrpcExporter}. */
DefaultGrpcExporter(
String type,
ManagedChannel channel,
MarshalerServiceStub<T, ?, ?> stub,
long timeoutNanos,
boolean compressionEnabled) {
this.type = type;
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc");
Attributes attributes = Attributes.builder().put("type", type).build();
seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes);
LongCounter exported = meter.counterBuilder("otlp.exported.exported").build();
success = exported.bind(attributes.toBuilder().put("success", true).build());
failed = exported.bind(attributes.toBuilder().put("success", false).build());

this.managedChannel = channel;
this.timeoutNanos = timeoutNanos;
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
this.stub = stub.withCompression(codec.getMessageEncoding());
}

@Override
public CompletableResultCode export(T exportRequest, int numItems) {
seen.add(numItems);

CompletableResultCode result = new CompletableResultCode();

MarshalerServiceStub<T, ?, ?> stub = this.stub;
if (timeoutNanos > 0) {
stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
}
Futures.addCallback(
stub.export(exportRequest),
new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object unused) {
success.add(numItems);
result.succeed();
}

@Override
public void onFailure(Throwable t) {
failed.add(numItems);
Status status = Status.fromThrowable(t);
switch (status.getCode()) {
case UNIMPLEMENTED:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server responded with UNIMPLEMENTED. "
+ "This usually means that your collector is not configured with an otlp "
+ "receiver in the \"pipelines\" section of the configuration. "
+ "Full error message: "
+ t.getMessage());
break;
case UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ t.getMessage());
break;
default:
logger.log(
Level.WARNING,
"Failed to export " + type + "s. Error message: " + t.getMessage());
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t);
}
result.fail();
}
},
MoreExecutors.directExecutor());

return result;
}

@Override
public CompletableResultCode shutdown() {
if (managedChannel.isTerminated()) {
return CompletableResultCode.ofSuccess();
}
seen.unbind();
success.unbind();
failed.unbind();
return ManagedChannelUtil.shutdownChannel(managedChannel);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;

/**
* A builder for {@link DefaultGrpcExporter}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporterBuilder<T extends Marshaler>
implements GrpcExporterBuilder<T> {

private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory;

@Nullable private ManagedChannel channel;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;

/** Creates a new {@link DefaultGrpcExporterBuilder}. */
DefaultGrpcExporterBuilder(
String type,
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint) {
this.type = type;
this.stubFactory = stubFactory;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}

@Override
public DefaultGrpcExporterBuilder<T> setChannel(ManagedChannel channel) {
this.channel = channel;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
timeoutNanos = unit.toNanos(timeout);
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public DefaultGrpcExporterBuilder<T> setEndpoint(String endpoint) {
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}

if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}

this.endpoint = uri;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) {
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
if (metadata == null) {
metadata = new Metadata();
}
metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value);
return this;
}

@Override
public GrpcExporter<T> build() {
ManagedChannel channel = this.channel;
if (channel == null) {
final ManagedChannelBuilder<?> managedChannelBuilder =
ManagedChannelBuilder.forTarget(endpoint.getAuthority());

if (endpoint.getScheme().equals("https")) {
managedChannelBuilder.useTransportSecurity();
} else {
managedChannelBuilder.usePlaintext();
}

if (metadata != null) {
managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}

if (trustedCertificatesPem != null) {
try {
ManagedChannelUtil.setTrustedCertificatesPem(
managedChannelBuilder, trustedCertificatesPem);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates for gRPC TLS connection, are they valid "
+ "X.509 in PEM format?",
e);
}
}

channel = managedChannelBuilder.build();
}

Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<T, ?, ?> stub =
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An exporter of a {@link io.opentelemetry.exporter.otlp.internal.Marshaler} using the gRPC wire
* format.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface GrpcExporter<T extends Marshaler> {

/** Returns a new {@link GrpcExporterBuilder}. */
static <T extends Marshaler> GrpcExporterBuilder<T> builder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
}

/**
* Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems}
* items.
*/
CompletableResultCode export(T exportRequest, int numItems);

/** Shuts the exporter down. */
CompletableResultCode shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/** A builder for {@link GrpcExporter}. */
public interface GrpcExporterBuilder<T extends Marshaler> {
GrpcExporterBuilder<T> setChannel(ManagedChannel channel);

GrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit);

GrpcExporterBuilder<T> setTimeout(Duration timeout);

GrpcExporterBuilder<T> setEndpoint(String endpoint);

GrpcExporterBuilder<T> setCompression(String compressionMethod);

GrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem);

GrpcExporterBuilder<T> addHeader(String key, String value);

GrpcExporter<T> build();
}
Loading

0 comments on commit 63bec07

Please sign in to comment.