Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OTEL metrics source to use Kafka buffer #3539

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class JacksonExponentialHistogram extends JacksonMetric implements Expone
private static final String SCALE_KEY = "scale";
private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
private static final String ZERO_COUNT_KEY = "zeroCount";
private static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
private static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
public static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
public static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
private static final String NEGATIVE_KEY = "negative";
private static final String POSITIVE_KEY = "positive";
private static final String NEGATIVE_OFFSET_KEY = "negativeOffset";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class JacksonHistogram extends JacksonMetric implements Histogram {
private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
private static final String BUCKET_COUNTS_KEY = "bucketCounts";
private static final String EXPLICIT_BOUNDS_COUNT_KEY = "explicitBoundsCount";
private static final String BUCKETS_KEY = "buckets";
public static final String BUCKETS_KEY = "buckets";
private static final String BUCKET_COUNTS_LIST_KEY = "bucketCountsList";
private static final String EXPLICIT_BOUNDS_KEY = "explicitBounds";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class JacksonMetric extends JacksonEvent implements Metric {
protected static final String SERVICE_NAME_KEY = "serviceName";
protected static final String KIND_KEY = "kind";
protected static final String UNIT_KEY = "unit";
protected static final String ATTRIBUTES_KEY = "attributes";
public static final String ATTRIBUTES_KEY = "attributes";
protected static final String SCHEMA_URL_KEY = "schemaUrl";
protected static final String EXEMPLARS_KEY = "exemplars";
protected static final String FLAGS_KEY = "flags";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/

package org.opensearch.dataprepper.plugins.processor.otelmetrics;

import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
import com.fasterxml.jackson.annotation.JsonProperty;

public class OtelMetricsRawProcessorConfig {
Expand All @@ -15,7 +17,7 @@ public class OtelMetricsRawProcessorConfig {

private Boolean calculateExponentialHistogramBuckets = true;

private Integer exponentialHistogramMaxAllowedScale = 10;
private Integer exponentialHistogramMaxAllowedScale = DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;

public Boolean getCalculateExponentialHistogramBuckets() {
return calculateExponentialHistogramBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -102,8 +103,9 @@ public void test() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
Expand Down Expand Up @@ -182,8 +184,11 @@ public void missingNameInvalidMetricTest() throws JsonProcessingException {
Record record = new Record<>(exportMetricRequest);
Record invalidRecord = new Record<>(exportMetricRequestWithInvalidMetric);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(rec.size(), equalTo(1));
Collection<Record<?>> records = Arrays.asList((Record<?>)record, invalidRecord);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);

//List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(outputRecords.size(), equalTo(1));
}

private void assertSumProcessing(Map<String, Object> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -83,8 +84,9 @@ public void testSummaryProcessing() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/otel-metrics-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation project(':data-prepper-plugins:blocking-buffer')
implementation libs.commons.codec
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:otel-proto-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
implementation libs.opentelemetry.proto
implementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv

private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
try {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Exception writing to buffer but request already timed out.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class OTelMetricsSource implements Source<Record<ExportMetricsServiceRequ
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private Server server;
private final ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics,
Expand All @@ -79,6 +82,12 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig,
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelMetricDecoder();
}

@Override
public ByteDecoder getDecoder() {
return byteDecoder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.Metric;


import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;


public class OTelMetricDecoder implements ByteDecoder {
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
public OTelMetricDecoder() {
otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
}
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(inputStream);
AtomicInteger droppedCounter = new AtomicInteger(0);
Collection<Record<? extends Metric>> records =
otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, the pipeline will have different behavior depending on the buffer type. With the in-memory buffer, we place the whole request in the buffer and it is the first object out of the pipeline. With an external buffer, it appears that we are now providing the actual metrics as the first events in the buffer.

I think the pipelines should be consistent between the two buffers. That is, you can swap your buffer without having to change your pipeline structure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of any way to make it 100% same. But the YAML file can stay same and it works same.

for (Record<? extends Metric> record: records) {
final JacksonEvent event = JacksonEvent.fromEvent(record.getData());
eventConsumer.accept(new Record<>(event));
}
}

}
Loading
Loading