Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow OTLP gRPC exporters to work without grpc-java using okhttp directly. #3684

Merged
merged 13 commits into from
Oct 6, 2021
4 changes: 2 additions & 2 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ val DEPENDENCIES = listOf(
// using old version of okhttp to avoid pulling in kotlin stdlib
// not using (old) okhttp bom because that is pulling in old guava version
// and overriding the guava bom
"com.squareup.okhttp3:okhttp:3.12.13",
"com.squareup.okhttp3:okhttp-tls:3.12.13",
"com.squareup.okhttp3:okhttp:3.14.9",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were accidentally not using the latest version (.12.13 appeared newer but was actually a backport of a fix to older okhttp3)

"com.squareup.okhttp3:okhttp-tls:3.14.9",
"com.sun.net.httpserver:http:20070405",
"com.tngtech.archunit:archunit-junit5:0.21.0",
"com.uber.nullaway:nullaway:0.9.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BoundLongCounter;
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link GrpcExporter} which uses the standard grpc-java library.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExporter<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to any naming suggestions. On the bright side, the actual exporters don't need to worry about the two types, they just use GrpcExporter.builder().


private static final Logger internalLogger =
Logger.getLogger(DefaultGrpcExporter.class.getName());

private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

private final String type;
private final ManagedChannel managedChannel;
private final MarshalerServiceStub<T, ?, ?> stub;
private final long timeoutNanos;

private final BoundLongCounter seen;
private final BoundLongCounter success;
private final BoundLongCounter failed;

/** Creates a new {@link DefaultGrpcExporter}. */
DefaultGrpcExporter(
String type,
ManagedChannel channel,
MarshalerServiceStub<T, ?, ?> stub,
long timeoutNanos,
boolean compressionEnabled) {
this.type = type;
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to call out in the changelog that the name of these metrics are changing? The instrumentation library will change, and also the metric name will change from io.opentelemetry.exporters.otlp to io.opentelemetry.exporters.otlp-grpc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call, although this shouldn't change the name of any metrics (unless you have a custom exporter that appends the meter name to the metric name, I guess?). I would be slightly surprised if anyone was actually using these metrics yet, but in case they are, it is definitely reasonable to call it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the meter names too, instead of having the type in the meter name it's an attribute now. The camelcase before also didn't seem conventional. I suspect calling out in release notes while making the change is good here - the instrumentation library change isn't that important so I could revert it, but given the metric name change anyways, I figured it's good to more easily differentiate metrics from -grpc, -grpc-okhttp, -http

Attributes attributes = Attributes.builder().put("type", type).build();
seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes);
LongCounter exported = meter.counterBuilder("otlp.exported.exported").build();
success = exported.bind(attributes.toBuilder().put("success", true).build());
failed = exported.bind(attributes.toBuilder().put("success", false).build());

this.managedChannel = channel;
this.timeoutNanos = timeoutNanos;
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
this.stub = stub.withCompression(codec.getMessageEncoding());
}

@Override
public CompletableResultCode export(T exportRequest, int numItems) {
seen.add(numItems);

CompletableResultCode result = new CompletableResultCode();

MarshalerServiceStub<T, ?, ?> stub = this.stub;
if (timeoutNanos > 0) {
stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
}
Futures.addCallback(
stub.export(exportRequest),
new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object unused) {
success.add(numItems);
result.succeed();
}

@Override
public void onFailure(Throwable t) {
failed.add(numItems);
Status status = Status.fromThrowable(t);
switch (status.getCode()) {
case UNIMPLEMENTED:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server responded with UNIMPLEMENTED. "
+ "This usually means that your collector is not configured with an otlp "
+ "receiver in the \"pipelines\" section of the configuration. "
+ "Full error message: "
+ t.getMessage());
break;
case UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ t.getMessage());
break;
default:
logger.log(
Level.WARNING,
"Failed to export " + type + "s. Error message: " + t.getMessage());
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t);
}
result.fail();
}
},
MoreExecutors.directExecutor());

return result;
}

@Override
public CompletableResultCode shutdown() {
if (managedChannel.isTerminated()) {
return CompletableResultCode.ofSuccess();
}
seen.unbind();
success.unbind();
failed.unbind();
return ManagedChannelUtil.shutdownChannel(managedChannel);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;

/**
* A builder for {@link DefaultGrpcExporter}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporterBuilder<T extends Marshaler>
implements GrpcExporterBuilder<T> {

private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory;

@Nullable private ManagedChannel channel;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;

/** Creates a new {@link DefaultGrpcExporterBuilder}. */
DefaultGrpcExporterBuilder(
String type,
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint) {
this.type = type;
this.stubFactory = stubFactory;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}

@Override
public DefaultGrpcExporterBuilder<T> setChannel(ManagedChannel channel) {
this.channel = channel;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
timeoutNanos = unit.toNanos(timeout);
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

@Override
public DefaultGrpcExporterBuilder<T> setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint");

URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}

if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}

this.endpoint = uri;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
this.compressionEnabled = true;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) {
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}

@Override
public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
if (metadata == null) {
metadata = new Metadata();
}
metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value);
return this;
}

@Override
public GrpcExporter<T> build() {
ManagedChannel channel = this.channel;
if (channel == null) {
final ManagedChannelBuilder<?> managedChannelBuilder =
ManagedChannelBuilder.forTarget(endpoint.getAuthority());

if (endpoint.getScheme().equals("https")) {
managedChannelBuilder.useTransportSecurity();
} else {
managedChannelBuilder.usePlaintext();
}

if (metadata != null) {
managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}

if (trustedCertificatesPem != null) {
try {
ManagedChannelUtil.setTrustedCertificatesPem(
managedChannelBuilder, trustedCertificatesPem);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates for gRPC TLS connection, are they valid "
+ "X.509 in PEM format?",
e);
}
}

channel = managedChannelBuilder.build();
}

Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<T, ?, ?> stub =
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An exporter of a {@link io.opentelemetry.exporter.otlp.internal.Marshaler} using the gRPC wire
* format.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface GrpcExporter<T extends Marshaler> {

/** Returns a new {@link GrpcExporterBuilder}. */
static <T extends Marshaler> GrpcExporterBuilder<T> builder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
}

/**
* Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems}
* items.
*/
CompletableResultCode export(T exportRequest, int numItems);

/** Shuts the exporter down. */
CompletableResultCode shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.grpc;

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/** A builder for {@link GrpcExporter}. */
public interface GrpcExporterBuilder<T extends Marshaler> {
GrpcExporterBuilder<T> setChannel(ManagedChannel channel);

GrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit);

GrpcExporterBuilder<T> setTimeout(Duration timeout);

GrpcExporterBuilder<T> setEndpoint(String endpoint);

GrpcExporterBuilder<T> setCompression(String compressionMethod);

GrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem);

GrpcExporterBuilder<T> addHeader(String key, String value);

GrpcExporter<T> build();
}
Loading