Skip to content

Commit

Permalink
Use minimal fallback managed channel when none is specified (#6110)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Jan 4, 2024
1 parent b4ed532 commit da7796b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void setUp() {
"span",
new UpstreamGrpcSender<>(
MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null),
/* shutdownChannel= */ false,
10,
Collections::emptyMap),
MeterProvider::noop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.otlp.testing.internal;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -23,8 +24,10 @@
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
Expand Down Expand Up @@ -61,6 +64,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -215,6 +219,23 @@ void reset() {
httpRequests.clear();
}

@Test
void minimalChannel() {
// Test that UpstreamGrpcSender uses minimal fallback managed channel, so skip for
// OkHttpGrpcSender
assumeThat(exporter.unwrap())
.extracting("delegate.grpcSender")
.matches(sender -> sender.getClass().getSimpleName().equals("UpstreamGrpcSender"));
// When no channel is explicitly set, should fall back to a minimally configured managed channel
TelemetryExporter<?> exporter = exporterBuilder().build();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(exporter.unwrap())
.extracting(
"delegate.grpcSender.stub",
as(InstanceOfAssertFactories.type(MarshalerServiceStub.class)))
.satisfies(stub -> assertThat(((ManagedChannel) stub.getChannel()).isShutdown()).isTrue());
}

@Test
void export() {
List<T> telemetry = Collections.singletonList(generateFakeTelemetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,21 @@ public TelemetryExporterBuilder<T> setChannel(Object channel) {

@Override
public TelemetryExporter<T> build() {
requireNonNull(channelBuilder, "channel");
Runnable shutdownCallback;
if (channelBuilder != null) {
try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
}

try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
shutdownCallback = channel::shutdownNow;
} else {
shutdownCallback = () -> {};
}

ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
TelemetryExporter<T> delegateExporter = delegate.build();
return new TelemetryExporter<T>() {
@Override
Expand All @@ -182,7 +187,7 @@ public CompletableResultCode export(Collection<T> items) {

@Override
public CompletableResultCode shutdown() {
channel.shutdownNow();
shutdownCallback.run();
return delegateExporter.shutdown();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.MetadataUtils;
Expand All @@ -32,16 +33,19 @@
public final class UpstreamGrpcSender<T extends Marshaler> implements GrpcSender<T> {

private final MarshalerServiceStub<T, ?, ?> stub;
private final boolean shutdownChannel;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headersSupplier;

/** Creates a new {@link UpstreamGrpcSender}. */
public UpstreamGrpcSender(
MarshalerServiceStub<T, ?, ?> stub,
boolean shutdownChannel,
long timeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier) {
this.timeoutNanos = timeoutNanos;
this.stub = stub;
this.shutdownChannel = shutdownChannel;
this.timeoutNanos = timeoutNanos;
this.headersSupplier = headersSupplier;
}

Expand Down Expand Up @@ -82,6 +86,10 @@ public void onFailure(Throwable t) {

@Override
public CompletableResultCode shutdown() {
if (shutdownChannel) {
ManagedChannel channel = (ManagedChannel) stub.getChannel();
channel.shutdownNow();
}
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
Expand Down Expand Up @@ -41,6 +43,13 @@ public <T extends Marshaler> GrpcSender<T> createSender(
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
shutdownChannel = true;
managedChannel = minimalFallbackManagedChannel(endpoint);
}

String authorityOverride = null;
Map<String, List<String>> headers = headersSupplier.get();
if (headers != null) {
Expand All @@ -58,6 +67,27 @@ public <T extends Marshaler> GrpcSender<T> createSender(
.apply((Channel) managedChannel, authorityOverride)
.withCompression(codec.getMessageEncoding());

return new UpstreamGrpcSender<>(stub, timeoutNanos, headersSupplier);
return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}

/**
* If {@link ManagedChannel} is not explicitly set, provide a minimally configured fallback
* channel to avoid failing initialization.
*
* <p>This is required to accommodate autoconfigure with {@code
* opentelemetry-exporter-sender-grpc-managed-channel} which will always fail to initialize
* without a fallback channel since there isn't an opportunity to explicitly set the channel.
*
* <p>This only incorporates the target address, port, and whether to use plain text. All
* additional settings are intentionally ignored and must be configured with an explicitly set
* {@link ManagedChannel}.
*/
private static ManagedChannel minimalFallbackManagedChannel(URI endpoint) {
ManagedChannelBuilder<?> channelBuilder =
ManagedChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort());
if (!endpoint.getScheme().equals("https")) {
channelBuilder.usePlaintext();
}
return channelBuilder.build();
}
}

0 comments on commit da7796b

Please sign in to comment.