-
Notifications
You must be signed in to change notification settings - Fork 873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate from gRPC to armeria for testing agent in memory exporter #5314
Changes from all commits
5c2b47b
49b59f4
c2bdfff
a0f6cf0
2f9b809
c470ad7
1cfc741
34bb257
dc4d91a
390b772
e2907b5
d22ca7e
d9d8e1c
1d47a04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,30 +5,43 @@ | |
|
||
package io.opentelemetry.javaagent.testing.exporter; | ||
|
||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public class AgentTestingExporterFactory { | ||
|
||
static final OtlpInMemorySpanExporter spanExporter = new OtlpInMemorySpanExporter(); | ||
static final OtlpInMemoryMetricExporter metricExporter = new OtlpInMemoryMetricExporter(); | ||
static final OtlpInMemoryLogExporter logExporter = new OtlpInMemoryLogExporter(); | ||
static final OtlpInMemoryCollector collector = new OtlpInMemoryCollector(); | ||
|
||
static final OtlpInMemorySpanExporter spanExporter = new OtlpInMemorySpanExporter(collector); | ||
static final OtlpInMemoryMetricExporter metricExporter = | ||
new OtlpInMemoryMetricExporter(collector); | ||
static final OtlpInMemoryLogExporter logExporter = new OtlpInMemoryLogExporter(collector); | ||
|
||
public static List<byte[]> getSpanExportRequests() { | ||
return spanExporter.getCollectedExportRequests(); | ||
AgentTestingCustomizer.spanProcessor.forceFlush().join(10, TimeUnit.SECONDS); | ||
return collector.getTraceExportRequests(); | ||
} | ||
|
||
public static List<byte[]> getMetricExportRequests() { | ||
return metricExporter.getCollectedExportRequests(); | ||
return collector.getMetricsExportRequests(); | ||
} | ||
|
||
public static List<byte[]> getLogExportRequests() { | ||
return logExporter.getCollectedExportRequests(); | ||
AgentTestingLogsCustomizer.logProcessor.forceFlush().join(10, TimeUnit.SECONDS); | ||
return collector.getLogsExportRequests(); | ||
} | ||
|
||
public static void reset() { | ||
spanExporter.reset(); | ||
metricExporter.reset(); | ||
logExporter.reset(); | ||
// Finish any pending trace or log exports before resetting. There is no such thing as | ||
// "finishing" metrics so we don't flush it here. | ||
List<CompletableResultCode> results = | ||
Arrays.asList( | ||
AgentTestingLogsCustomizer.logProcessor.forceFlush(), | ||
AgentTestingCustomizer.spanProcessor.forceFlush()); | ||
CompletableResultCode.ofAll(results).join(10, TimeUnit.SECONDS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops realized that we don't want to flush metrics here anyways (there's no such thing as pending metric exports really, all exports happen at random times and are valid) |
||
collector.reset(); | ||
} | ||
|
||
public static boolean forceFlushCalled() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.testing.exporter; | ||
|
||
import static java.util.concurrent.CompletableFuture.completedFuture; | ||
|
||
import com.linecorp.armeria.server.Server; | ||
import com.linecorp.armeria.server.ServiceRequestContext; | ||
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; | ||
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; | ||
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; | ||
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Queue; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
|
||
final class OtlpInMemoryCollector { | ||
|
||
private final OtlpService logsService; | ||
private final OtlpService metricsService; | ||
private final OtlpService traceService; | ||
|
||
private final Server server; | ||
|
||
// To shutdown after all exporters are shutdown; | ||
private int refCnt; | ||
|
||
OtlpInMemoryCollector() { | ||
logsService = new OtlpService(ExportLogsServiceResponse.getDefaultInstance().toByteArray()); | ||
metricsService = | ||
new OtlpService(ExportMetricsServiceResponse.getDefaultInstance().toByteArray()); | ||
traceService = new OtlpService(ExportTraceServiceResponse.getDefaultInstance().toByteArray()); | ||
server = | ||
Server.builder() | ||
.service("/opentelemetry.proto.collector.logs.v1.LogsService/Export", logsService) | ||
.service( | ||
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", metricsService) | ||
.service("/opentelemetry.proto.collector.trace.v1.TraceService/Export", traceService) | ||
.build(); | ||
} | ||
|
||
synchronized void start() { | ||
if (refCnt == 0) { | ||
server.start().join(); | ||
} | ||
refCnt++; | ||
} | ||
|
||
synchronized void stop() { | ||
refCnt--; | ||
if (refCnt == 0) { | ||
server.stop(); | ||
} | ||
} | ||
|
||
String getEndpoint() { | ||
return "http://localhost:" + server.activeLocalPort(); | ||
} | ||
|
||
List<byte[]> getLogsExportRequests() { | ||
return logsService.getCollectedRequests(); | ||
} | ||
|
||
List<byte[]> getMetricsExportRequests() { | ||
return metricsService.getCollectedRequests(); | ||
} | ||
|
||
List<byte[]> getTraceExportRequests() { | ||
return traceService.getCollectedRequests(); | ||
} | ||
|
||
void reset() { | ||
logsService.reset(); | ||
metricsService.reset(); | ||
traceService.reset(); | ||
} | ||
|
||
private static final class OtlpService extends AbstractUnaryGrpcService { | ||
private final Queue<byte[]> collectedRequests = new ConcurrentLinkedQueue<>(); | ||
|
||
private final byte[] response; | ||
|
||
OtlpService(byte[] response) { | ||
this.response = response; | ||
} | ||
|
||
List<byte[]> getCollectedRequests() { | ||
return new ArrayList<>(collectedRequests); | ||
} | ||
|
||
void reset() { | ||
collectedRequests.clear(); | ||
} | ||
|
||
@Override | ||
protected CompletionStage<byte[]> handleMessage(ServiceRequestContext ctx, byte[] message) { | ||
collectedRequests.add(message); | ||
return completedFuture(response); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a brief comment explaining why this is needed (or just pointing to the PR discussion)?