-
Notifications
You must be signed in to change notification settings - Fork 848
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Anuraag Agrawal
committed
Oct 4, 2021
1 parent
08fda86
commit 7929de0
Showing
18 changed files
with
1,037 additions
and
221 deletions.
There are no files selected for viewing
142 changes: 142 additions & 0 deletions
142
...ommon/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.exporter.otlp.internal.grpc; | ||
|
||
import com.google.common.util.concurrent.FutureCallback; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import io.grpc.Codec; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.Status; | ||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.metrics.BoundLongCounter; | ||
import io.opentelemetry.api.metrics.GlobalMeterProvider; | ||
import io.opentelemetry.api.metrics.LongCounter; | ||
import io.opentelemetry.api.metrics.Meter; | ||
import io.opentelemetry.exporter.otlp.internal.Marshaler; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.internal.ThrottlingLogger; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
/** | ||
* A {@link GrpcExporter} which uses the standard grpc-java library. | ||
* | ||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change | ||
* at any time. | ||
*/ | ||
public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExporter<T> { | ||
|
||
private static final Logger internalLogger = | ||
Logger.getLogger(DefaultGrpcExporter.class.getName()); | ||
|
||
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); | ||
|
||
private final String type; | ||
private final ManagedChannel managedChannel; | ||
private final MarshalerServiceStub<T, ?, ?> stub; | ||
private final long timeoutNanos; | ||
|
||
private final BoundLongCounter seen; | ||
private final BoundLongCounter success; | ||
private final BoundLongCounter failed; | ||
|
||
/** Creates a new {@link DefaultGrpcExporter}. */ | ||
public DefaultGrpcExporter( | ||
String type, | ||
ManagedChannel channel, | ||
MarshalerServiceStub<T, ?, ?> stub, | ||
long timeoutNanos, | ||
boolean compressionEnabled) { | ||
this.type = type; | ||
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc"); | ||
Attributes attributes = Attributes.builder().put("type", type).build(); | ||
seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes); | ||
LongCounter exported = meter.counterBuilder("otlp.exported.exported").build(); | ||
success = exported.bind(attributes.toBuilder().put("success", true).build()); | ||
failed = exported.bind(attributes.toBuilder().put("success", false).build()); | ||
|
||
this.managedChannel = channel; | ||
this.timeoutNanos = timeoutNanos; | ||
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; | ||
this.stub = stub.withCompression(codec.getMessageEncoding()); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(T exportRequest, int numItems) { | ||
seen.add(numItems); | ||
|
||
CompletableResultCode result = new CompletableResultCode(); | ||
|
||
MarshalerServiceStub<T, ?, ?> stub = this.stub; | ||
if (timeoutNanos > 0) { | ||
stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); | ||
} | ||
Futures.addCallback( | ||
stub.export(exportRequest), | ||
new FutureCallback<Object>() { | ||
@Override | ||
public void onSuccess(@Nullable Object unused) { | ||
success.add(numItems); | ||
result.succeed(); | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable t) { | ||
failed.add(numItems); | ||
Status status = Status.fromThrowable(t); | ||
switch (status.getCode()) { | ||
case UNIMPLEMENTED: | ||
logger.log( | ||
Level.SEVERE, | ||
"Failed to export " | ||
+ type | ||
+ "s. Server responded with UNIMPLEMENTED. " | ||
+ "This usually means that your collector is not configured with an otlp " | ||
+ "receiver in the \"pipelines\" section of the configuration. " | ||
+ "Full error message: " | ||
+ t.getMessage()); | ||
break; | ||
case UNAVAILABLE: | ||
logger.log( | ||
Level.SEVERE, | ||
"Failed to export " | ||
+ type | ||
+ "s. Server is UNAVAILABLE. " | ||
+ "Make sure your collector is running and reachable from this network. " | ||
+ "Full error message:" | ||
+ t.getMessage()); | ||
break; | ||
default: | ||
logger.log( | ||
Level.WARNING, | ||
"Failed to export " + type + "s. Error message: " + t.getMessage()); | ||
break; | ||
} | ||
if (logger.isLoggable(Level.FINEST)) { | ||
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t); | ||
} | ||
result.fail(); | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
|
||
return result; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
if (managedChannel.isTerminated()) { | ||
return CompletableResultCode.ofSuccess(); | ||
} | ||
seen.unbind(); | ||
success.unbind(); | ||
failed.unbind(); | ||
return ManagedChannelUtil.shutdownChannel(managedChannel); | ||
} | ||
} |
160 changes: 160 additions & 0 deletions
160
...rc/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.exporter.otlp.internal.grpc; | ||
|
||
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; | ||
import static io.opentelemetry.api.internal.Utils.checkArgument; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import io.grpc.Codec; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.ManagedChannelBuilder; | ||
import io.grpc.Metadata; | ||
import io.grpc.stub.MetadataUtils; | ||
import io.opentelemetry.exporter.otlp.internal.Marshaler; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.time.Duration; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import javax.annotation.Nullable; | ||
import javax.net.ssl.SSLException; | ||
|
||
/** | ||
* A builder for {@link DefaultGrpcExporter}. | ||
* | ||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change | ||
* at any time. | ||
*/ | ||
public final class DefaultGrpcExporterBuilder<T extends Marshaler> | ||
implements GrpcExporterBuilder<T> { | ||
|
||
private final String type; | ||
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory; | ||
|
||
@Nullable private ManagedChannel channel; | ||
private long timeoutNanos; | ||
private URI endpoint; | ||
private boolean compressionEnabled = false; | ||
@Nullable private Metadata metadata; | ||
@Nullable private byte[] trustedCertificatesPem; | ||
|
||
/** Creates a new {@link DefaultGrpcExporterBuilder}. */ | ||
public DefaultGrpcExporterBuilder( | ||
String type, | ||
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory, | ||
long defaultTimeoutSecs, | ||
URI defaultEndpoint) { | ||
this.type = type; | ||
this.stubFactory = stubFactory; | ||
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs); | ||
endpoint = defaultEndpoint; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setChannel(ManagedChannel channel) { | ||
this.channel = channel; | ||
return this; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) { | ||
requireNonNull(unit, "unit"); | ||
checkArgument(timeout >= 0, "timeout must be non-negative"); | ||
timeoutNanos = unit.toNanos(timeout); | ||
return this; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setTimeout(Duration timeout) { | ||
requireNonNull(timeout, "timeout"); | ||
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setEndpoint(String endpoint) { | ||
requireNonNull(endpoint, "endpoint"); | ||
|
||
URI uri; | ||
try { | ||
uri = new URI(endpoint); | ||
} catch (URISyntaxException e) { | ||
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); | ||
} | ||
|
||
if (uri.getScheme() == null | ||
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { | ||
throw new IllegalArgumentException( | ||
"Invalid endpoint, must start with http:// or https://: " + uri); | ||
} | ||
|
||
this.endpoint = uri; | ||
return this; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setCompression(String compressionMethod) { | ||
requireNonNull(compressionMethod, "compressionMethod"); | ||
checkArgument( | ||
compressionMethod.equals("gzip"), | ||
"Unsupported compression method. Supported compression methods include: gzip."); | ||
this.compressionEnabled = true; | ||
return this; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) { | ||
this.trustedCertificatesPem = trustedCertificatesPem; | ||
return this; | ||
} | ||
|
||
@Override | ||
public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) { | ||
if (metadata == null) { | ||
metadata = new Metadata(); | ||
} | ||
metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); | ||
return this; | ||
} | ||
|
||
@Override | ||
public GrpcExporter<T> build() { | ||
ManagedChannel channel = this.channel; | ||
if (channel == null) { | ||
final ManagedChannelBuilder<?> managedChannelBuilder = | ||
ManagedChannelBuilder.forTarget(endpoint.getAuthority()); | ||
|
||
if (endpoint.getScheme().equals("https")) { | ||
managedChannelBuilder.useTransportSecurity(); | ||
} else { | ||
managedChannelBuilder.usePlaintext(); | ||
} | ||
|
||
if (metadata != null) { | ||
managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); | ||
} | ||
|
||
if (trustedCertificatesPem != null) { | ||
try { | ||
ManagedChannelUtil.setTrustedCertificatesPem( | ||
managedChannelBuilder, trustedCertificatesPem); | ||
} catch (SSLException e) { | ||
throw new IllegalStateException( | ||
"Could not set trusted certificates for gRPC TLS connection, are they valid " | ||
+ "X.509 in PEM format?", | ||
e); | ||
} | ||
} | ||
|
||
channel = managedChannelBuilder.build(); | ||
} | ||
|
||
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; | ||
MarshalerServiceStub<T, ?, ?> stub = | ||
stubFactory.apply(channel).withCompression(codec.getMessageEncoding()); | ||
return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled); | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
.../otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.exporter.otlp.internal.grpc; | ||
|
||
import io.opentelemetry.exporter.otlp.internal.Marshaler; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
|
||
/** | ||
* An exporter of a {@link io.opentelemetry.exporter.otlp.internal.Marshaler} using the gRPC wire | ||
* format. | ||
* | ||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change | ||
* at any time. | ||
*/ | ||
public interface GrpcExporter<T extends Marshaler> { | ||
|
||
/** | ||
* Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems} | ||
* items. | ||
*/ | ||
CompletableResultCode export(T exportRequest, int numItems); | ||
|
||
/** Shuts the exporter down. */ | ||
CompletableResultCode shutdown(); | ||
} |
29 changes: 29 additions & 0 deletions
29
...ommon/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.exporter.otlp.internal.grpc; | ||
|
||
import io.grpc.ManagedChannel; | ||
import io.opentelemetry.exporter.otlp.internal.Marshaler; | ||
import java.time.Duration; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public interface GrpcExporterBuilder<T extends Marshaler> { | ||
GrpcExporterBuilder<T> setChannel(ManagedChannel channel); | ||
|
||
GrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit); | ||
|
||
GrpcExporterBuilder<T> setTimeout(Duration timeout); | ||
|
||
GrpcExporterBuilder<T> setEndpoint(String endpoint); | ||
|
||
GrpcExporterBuilder<T> setCompression(String compressionMethod); | ||
|
||
GrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem); | ||
|
||
GrpcExporterBuilder<T> addHeader(String key, String value); | ||
|
||
GrpcExporter<T> build(); | ||
} |
19 changes: 19 additions & 0 deletions
19
...mmon/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.exporter.otlp.internal.grpc; | ||
|
||
import com.google.common.util.concurrent.ListenableFuture; | ||
import io.grpc.CallOptions; | ||
import io.grpc.Channel; | ||
|
||
public abstract class MarshalerServiceStub<T, U, S extends MarshalerServiceStub<T, U, S>> | ||
extends io.grpc.stub.AbstractFutureStub<S> { | ||
protected MarshalerServiceStub(Channel channel, CallOptions callOptions) { | ||
super(channel, callOptions); | ||
} | ||
|
||
public abstract ListenableFuture<U> export(T request); | ||
} |
Oops, something went wrong.