Skip to content

Commit

Permalink
Add support for OTEL metrics source to use Kafka buffer (#3539)
Browse files Browse the repository at this point in the history
* Add support for OTEL metrics source to use Kafka buffer

Signed-off-by: Krishna Kondaka <[email protected]>

* Added tests and fixed test failures

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Nov 9, 2023
1 parent 85aba4d commit bc504fd
Show file tree
Hide file tree
Showing 16 changed files with 626 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class JacksonExponentialHistogram extends JacksonMetric implements Expone
private static final String SCALE_KEY = "scale";
private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
private static final String ZERO_COUNT_KEY = "zeroCount";
private static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
private static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
public static final String POSITIVE_BUCKETS_KEY = "positiveBuckets";
public static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets";
private static final String NEGATIVE_KEY = "negative";
private static final String POSITIVE_KEY = "positive";
private static final String NEGATIVE_OFFSET_KEY = "negativeOffset";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class JacksonHistogram extends JacksonMetric implements Histogram {
private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality";
private static final String BUCKET_COUNTS_KEY = "bucketCounts";
private static final String EXPLICIT_BOUNDS_COUNT_KEY = "explicitBoundsCount";
private static final String BUCKETS_KEY = "buckets";
public static final String BUCKETS_KEY = "buckets";
private static final String BUCKET_COUNTS_LIST_KEY = "bucketCountsList";
private static final String EXPLICIT_BOUNDS_KEY = "explicitBounds";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class JacksonMetric extends JacksonEvent implements Metric {
protected static final String SERVICE_NAME_KEY = "serviceName";
protected static final String KIND_KEY = "kind";
protected static final String UNIT_KEY = "unit";
protected static final String ATTRIBUTES_KEY = "attributes";
public static final String ATTRIBUTES_KEY = "attributes";
protected static final String SCHEMA_URL_KEY = "schemaUrl";
protected static final String EXEMPLARS_KEY = "exemplars";
protected static final String FLAGS_KEY = "flags";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/

package org.opensearch.dataprepper.plugins.processor.otelmetrics;

import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
import com.fasterxml.jackson.annotation.JsonProperty;

public class OtelMetricsRawProcessorConfig {
Expand All @@ -15,7 +17,7 @@ public class OtelMetricsRawProcessorConfig {

private Boolean calculateExponentialHistogramBuckets = true;

private Integer exponentialHistogramMaxAllowedScale = 10;
private Integer exponentialHistogramMaxAllowedScale = DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;

public Boolean getCalculateExponentialHistogramBuckets() {
return calculateExponentialHistogramBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -102,8 +103,9 @@ public void test() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
Expand Down Expand Up @@ -182,8 +184,11 @@ public void missingNameInvalidMetricTest() throws JsonProcessingException {
Record record = new Record<>(exportMetricRequest);
Record invalidRecord = new Record<>(exportMetricRequestWithInvalidMetric);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(rec.size(), equalTo(1));
Collection<Record<?>> records = Arrays.asList((Record<?>)record, invalidRecord);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);

//List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(outputRecords.size(), equalTo(1));
}

private void assertSumProcessing(Map<String, Object> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -83,8 +84,9 @@ public void testSummaryProcessing() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/otel-metrics-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation project(':data-prepper-plugins:blocking-buffer')
implementation libs.commons.codec
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:otel-proto-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
implementation libs.opentelemetry.proto
implementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv

private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
try {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Exception writing to buffer but request already timed out.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class OTelMetricsSource implements Source<Record<ExportMetricsServiceRequ
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private Server server;
private final ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics,
Expand All @@ -79,6 +82,12 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig,
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelMetricDecoder();
}

@Override
public ByteDecoder getDecoder() {
return byteDecoder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.otel.codec;

import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.Metric;


import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;


public class OTelMetricDecoder implements ByteDecoder {
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
public OTelMetricDecoder() {
otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
}
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(inputStream);
AtomicInteger droppedCounter = new AtomicInteger(0);
Collection<Record<? extends Metric>> records =
otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
for (Record<? extends Metric> record: records) {
final JacksonEvent event = JacksonEvent.fromEvent(record.getData());
eventConsumer.accept(new Record<>(event));
}
}

}
Loading

0 comments on commit bc504fd

Please sign in to comment.