From ca101b5663ff93c4e618ed80e4d9a6772c42750c Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 23 Oct 2023 14:33:13 +0000 Subject: [PATCH 1/2] Add support for OTEL metrics source to use Kafka buffer Signed-off-by: Krishna Kondaka --- .../metric/JacksonExponentialHistogram.java | 4 +- .../model/metric/JacksonHistogram.java | 2 +- .../model/metric/JacksonMetric.java | 2 +- .../otelmetrics/OTelMetricsRawProcessor.java | 286 ++++-------------- .../OtelMetricsRawProcessorConfig.java | 4 +- .../otel-metrics-source/build.gradle | 1 + .../otelmetrics/OTelMetricsGrpcService.java | 10 +- .../source/otelmetrics/OTelMetricsSource.java | 10 + .../plugins/otel/codec/OTelMetricDecoder.java | 40 +++ .../plugins/otel/codec/OTelProtoCodec.java | 276 +++++++++++++++++ 10 files changed, 395 insertions(+), 240 deletions(-) create mode 100644 data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java index b52c850e46..b865ce0eb5 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java @@ -27,8 +27,8 @@ public class JacksonExponentialHistogram extends JacksonMetric implements Expone private static final String SCALE_KEY = "scale"; private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality"; private static final String ZERO_COUNT_KEY = "zeroCount"; - private static final String POSITIVE_BUCKETS_KEY = "positiveBuckets"; - private static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets"; + public static final String POSITIVE_BUCKETS_KEY = "positiveBuckets"; + public static final String NEGATIVE_BUCKETS_KEY = "negativeBuckets"; private static final String NEGATIVE_KEY = "negative"; private static final String POSITIVE_KEY = "positive"; private static final String NEGATIVE_OFFSET_KEY = "negativeOffset"; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java index 0209f7012d..f9e066875d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java @@ -29,7 +29,7 @@ public class JacksonHistogram extends JacksonMetric implements Histogram { private static final String AGGREGATION_TEMPORALITY_KEY = "aggregationTemporality"; private static final String BUCKET_COUNTS_KEY = "bucketCounts"; private static final String EXPLICIT_BOUNDS_COUNT_KEY = "explicitBoundsCount"; - private static final String BUCKETS_KEY = "buckets"; + public static final String BUCKETS_KEY = "buckets"; private static final String BUCKET_COUNTS_LIST_KEY = "bucketCountsList"; private static final String EXPLICIT_BOUNDS_KEY = "explicitBounds"; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java index 0ab81ed7e0..8d8ebf0f87 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java @@ -28,7 +28,7 @@ public abstract class JacksonMetric extends JacksonEvent implements Metric { protected static final String SERVICE_NAME_KEY = "serviceName"; protected static final String KIND_KEY = "kind"; protected static final String UNIT_KEY = "unit"; - protected static final String ATTRIBUTES_KEY = "attributes"; + public static final String ATTRIBUTES_KEY = "attributes"; protected static final String SCHEMA_URL_KEY = "schemaUrl"; protected static final String EXEMPLARS_KEY = "exemplars"; protected static final String FLAGS_KEY = "flags"; diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java index e180e48ac3..3e8f0874a8 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java @@ -6,35 +6,32 @@ package org.opensearch.dataprepper.plugins.processor.otelmetrics; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram; -import org.opensearch.dataprepper.model.metric.JacksonGauge; -import org.opensearch.dataprepper.model.metric.JacksonHistogram; -import org.opensearch.dataprepper.model.metric.JacksonSum; -import org.opensearch.dataprepper.model.metric.JacksonSummary; +import static org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram.POSITIVE_BUCKETS_KEY; +import static org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram.NEGATIVE_BUCKETS_KEY; +import static org.opensearch.dataprepper.model.metric.JacksonHistogram.BUCKETS_KEY; import org.opensearch.dataprepper.model.metric.Metric; +import static org.opensearch.dataprepper.model.metric.JacksonMetric.ATTRIBUTES_KEY; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import io.micrometer.core.instrument.Counter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; @DataPrepperPlugin(name = "otel_metrics", deprecatedName = "otel_metrics_raw_processor", pluginType = Processor.class, pluginConfigurationType = OtelMetricsRawProcessorConfig.class) -public class OTelMetricsRawProcessor extends AbstractProcessor, Record> { +public class OTelMetricsRawProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsRawProcessor.class); public static final String RECORDS_DROPPED_METRICS_RAW = "recordsDroppedMetricsRaw"; @@ -52,240 +49,61 @@ public OTelMetricsRawProcessor(PluginSetting pluginSetting, final OtelMetricsRaw this.flattenAttributesFlag = otelMetricsRawProcessorConfig.getFlattenAttributesFlag(); } - @Override - public Collection> doExecute(Collection> records) { - Collection> recordsOut = new ArrayList<>(); - for (Record ets : records) { - for (ResourceMetrics rs : ets.getData().getResourceMetricsList()) { - final String schemaUrl = rs.getSchemaUrl(); - final Map resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource()); - final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null); + private void modifyRecord(Record record, + boolean flattenAttributes, + boolean calcualteHistogramBuckets, + boolean calcualteExponentialHistogramBuckets) { + Event event = (Event)record.getData(); - for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) { - final Map ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary()); - recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl)); - } + if (flattenAttributes) { + Map attributes = event.get(ATTRIBUTES_KEY, Map.class); - for (ScopeMetrics sm : rs.getScopeMetricsList()) { - final Map ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope()); - recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl)); - } + for (Map.Entry entry : attributes.entrySet()) { + event.put(entry.getKey(), entry.getValue()); } + event.delete(ATTRIBUTES_KEY); } - return recordsOut; - } - - private List> processMetricsList(final List metricsList, - final String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { - List> recordsOut = new ArrayList<>(); - for (io.opentelemetry.proto.metrics.v1.Metric metric : metricsList) { - try { - if (metric.hasGauge()) { - recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl)); - } else if (metric.hasSum()) { - recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl)); - } else if (metric.hasSummary()) { - recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl)); - } else if (metric.hasHistogram()) { - recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl)); - } else if (metric.hasExponentialHistogram()) { - recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl)); - } - } catch (Exception e) { - LOG.warn("Error while processing metrics", e); - recordsDroppedMetricsRawCounter.increment(); + if (!calcualteHistogramBuckets && event.get(BUCKETS_KEY, List.class) != null) { + event.delete(BUCKETS_KEY); + } + if (!calcualteExponentialHistogramBuckets) { + if (event.get(POSITIVE_BUCKETS_KEY, List.class) != null) { + event.delete(POSITIVE_BUCKETS_KEY); + } + if (event.get(NEGATIVE_BUCKETS_KEY, List.class) != null) { + event.delete(NEGATIVE_BUCKETS_KEY); } } - return recordsOut; - } - - private List> mapGauge(io.opentelemetry.proto.metrics.v1.Metric metric, - String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { - return metric.getGauge().getDataPointsList().stream() - .map(dp -> JacksonGauge.builder() - .withUnit(metric.getUnit()) - .withName(metric.getName()) - .withDescription(metric.getDescription()) - .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp)) - .withTime(OTelProtoCodec.getTimeISO8601(dp)) - .withServiceName(serviceName) - .withValue(OTelProtoCodec.getValueAsDouble(dp)) - .withAttributes(OTelProtoCodec.mergeAllAttributes( - Arrays.asList( - OTelProtoCodec.convertKeysOfDataPointAttributes(dp), - resourceAttributes, - ils - ) - )) - .withSchemaUrl(schemaUrl) - .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) - .withFlags(dp.getFlags()) - .build(flattenAttributesFlag)) - .map(Record::new) - .collect(Collectors.toList()); } - private List> mapSum(final io.opentelemetry.proto.metrics.v1.Metric metric, - final String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { - return metric.getSum().getDataPointsList().stream() - .map(dp -> JacksonSum.builder() - .withUnit(metric.getUnit()) - .withName(metric.getName()) - .withDescription(metric.getDescription()) - .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp)) - .withTime(OTelProtoCodec.getTimeISO8601(dp)) - .withServiceName(serviceName) - .withIsMonotonic(metric.getSum().getIsMonotonic()) - .withValue(OTelProtoCodec.getValueAsDouble(dp)) - .withAggregationTemporality(metric.getSum().getAggregationTemporality().toString()) - .withAttributes(OTelProtoCodec.mergeAllAttributes( - Arrays.asList( - OTelProtoCodec.convertKeysOfDataPointAttributes(dp), - resourceAttributes, - ils - ) - )) - .withSchemaUrl(schemaUrl) - .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) - .withFlags(dp.getFlags()) - .build(flattenAttributesFlag)) - .map(Record::new) - .collect(Collectors.toList()); - } - - private List> mapSummary(final io.opentelemetry.proto.metrics.v1.Metric metric, - final String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { - return metric.getSummary().getDataPointsList().stream() - .map(dp -> JacksonSummary.builder() - .withUnit(metric.getUnit()) - .withName(metric.getName()) - .withDescription(metric.getDescription()) - .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) - .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) - .withServiceName(serviceName) - .withCount(dp.getCount()) - .withSum(dp.getSum()) - .withQuantiles(OTelProtoCodec.getQuantileValues(dp.getQuantileValuesList())) - .withQuantilesValueCount(dp.getQuantileValuesCount()) - .withAttributes(OTelProtoCodec.mergeAllAttributes( - Arrays.asList( - OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), - resourceAttributes, - ils - ) - )) - .withSchemaUrl(schemaUrl) - .withFlags(dp.getFlags()) - .build(flattenAttributesFlag)) - .map(Record::new) - .collect(Collectors.toList()); - } - - private List> mapHistogram(final io.opentelemetry.proto.metrics.v1.Metric metric, - final String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { - return metric.getHistogram().getDataPointsList().stream() - .map(dp -> { - JacksonHistogram.Builder builder = JacksonHistogram.builder() - .withUnit(metric.getUnit()) - .withName(metric.getName()) - .withDescription(metric.getDescription()) - .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) - .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) - .withServiceName(serviceName) - .withSum(dp.getSum()) - .withCount(dp.getCount()) - .withBucketCount(dp.getBucketCountsCount()) - .withExplicitBoundsCount(dp.getExplicitBoundsCount()) - .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString()) - .withBucketCountsList(dp.getBucketCountsList()) - .withExplicitBoundsList(dp.getExplicitBoundsList()) - .withAttributes(OTelProtoCodec.mergeAllAttributes( - Arrays.asList( - OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), - resourceAttributes, - ils - ) - )) - .withSchemaUrl(schemaUrl) - .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) - .withFlags(dp.getFlags()); - if (otelMetricsRawProcessorConfig.getCalculateHistogramBuckets()) { - builder.withBuckets(OTelProtoCodec.createBuckets(dp.getBucketCountsList(), dp.getExplicitBoundsList())); - } - JacksonHistogram jh = builder.build(flattenAttributesFlag); - return jh; - - }) - .map(Record::new) - .collect(Collectors.toList()); - } - - private List> mapExponentialHistogram(io.opentelemetry.proto.metrics.v1.Metric metric, String serviceName, Map ils, Map resourceAttributes, String schemaUrl) { - return metric.getExponentialHistogram().getDataPointsList().stream() - .filter(dp -> { - if (otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets() && - otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale() < Math.abs(dp.getScale())){ - LOG.error("Exponential histogram can not be processed since its scale of {} is bigger than the configured max of {}.", dp.getScale(), otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale()); - return false; - } else { - return true; - } - }) - .map(dp -> { - JacksonExponentialHistogram.Builder builder = JacksonExponentialHistogram.builder() - .withUnit(metric.getUnit()) - .withName(metric.getName()) - .withDescription(metric.getDescription()) - .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) - .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) - .withServiceName(serviceName) - .withSum(dp.getSum()) - .withCount(dp.getCount()) - .withZeroCount(dp.getZeroCount()) - .withScale(dp.getScale()) - .withPositive(dp.getPositive().getBucketCountsList()) - .withPositiveOffset(dp.getPositive().getOffset()) - .withNegative(dp.getNegative().getBucketCountsList()) - .withNegativeOffset(dp.getNegative().getOffset()) - .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString()) - .withAttributes(OTelProtoCodec.mergeAllAttributes( - Arrays.asList( - OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), - resourceAttributes, - ils - ) - )) - .withSchemaUrl(schemaUrl) - .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) - .withFlags(dp.getFlags()); + @Override + public Collection> doExecute(Collection> records) { + Collection> recordsOut = new ArrayList<>(); + OTelProtoCodec.OTelProtoDecoder otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + AtomicInteger droppedCounter = new AtomicInteger(0); + + for (Record rec : records) { + Record newRecord = (Record)rec; + if ((rec.getData() instanceof Event)) { + if (otelMetricsRawProcessorConfig.getFlattenAttributesFlag() || + !otelMetricsRawProcessorConfig.getCalculateHistogramBuckets() || + !otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) { + modifyRecord(newRecord, otelMetricsRawProcessorConfig.getFlattenAttributesFlag(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()); + } + } + recordsOut.add(newRecord); - if (otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) { - builder.withPositiveBuckets(OTelProtoCodec.createExponentialBuckets(dp.getPositive(), dp.getScale())); - builder.withNegativeBuckets(OTelProtoCodec.createExponentialBuckets(dp.getNegative(), dp.getScale())); - } + if (!(rec.getData() instanceof ExportMetricsServiceRequest)) { + continue; + } - return builder.build(flattenAttributesFlag); - }) - .map(Record::new) - .collect(Collectors.toList()); + ExportMetricsServiceRequest request = ((Record)rec).getData(); + recordsOut.addAll(otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets(), flattenAttributesFlag)); + } + recordsDroppedMetricsRawCounter.increment(droppedCounter.get()); + return recordsOut; } - @Override public void prepareForShutdown() { } diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java index b74460d0f7..9935cc9218 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OtelMetricsRawProcessorConfig.java @@ -4,6 +4,8 @@ */ package org.opensearch.dataprepper.plugins.processor.otelmetrics; + +import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; import com.fasterxml.jackson.annotation.JsonProperty; public class OtelMetricsRawProcessorConfig { @@ -15,7 +17,7 @@ public class OtelMetricsRawProcessorConfig { private Boolean calculateExponentialHistogramBuckets = true; - private Integer exponentialHistogramMaxAllowedScale = 10; + private Integer exponentialHistogramMaxAllowedScale = DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; public Boolean getCalculateExponentialHistogramBuckets() { return calculateExponentialHistogramBuckets; diff --git a/data-prepper-plugins/otel-metrics-source/build.gradle b/data-prepper-plugins/otel-metrics-source/build.gradle index abe038c645..6372395a81 100644 --- a/data-prepper-plugins/otel-metrics-source/build.gradle +++ b/data-prepper-plugins/otel-metrics-source/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation project(':data-prepper-plugins:blocking-buffer') implementation libs.commons.codec implementation project(':data-prepper-plugins:armeria-common') + implementation project(':data-prepper-plugins:otel-proto-common') testImplementation project(':data-prepper-api').sourceSets.test.output implementation libs.opentelemetry.proto implementation libs.commons.io diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 8da1ad63f7..1efc4a7549 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,13 +38,16 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp private final Counter successRequestsCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; + private final ByteDecoder byteDecoder; public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, Buffer> buffer, + final ByteDecoder byteDecoder, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; this.buffer = buffer; + this.byteDecoder = byteDecoder; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); @@ -70,7 +74,11 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { try { - buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); + } } catch (Exception e) { if (ServiceRequestContext.current().isTimedOut()) { LOG.warn("Exception writing to buffer but request already timed out.", e); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index fcfd9524d9..08c7432d98 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -32,6 +32,8 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; @@ -62,6 +64,7 @@ public class OTelMetricsSource implements Source> buffer) { final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), buffer, + byteDecoder, pluginMetrics ); diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java new file mode 100644 index 0000000000..6f3f3a2b6e --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.metric.Metric; + +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + + +public class OTelMetricDecoder implements ByteDecoder { + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + public OTelMetricDecoder() { + otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(inputStream); + AtomicInteger droppedCounter = new AtomicInteger(0); + Collection> records = + otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + for (Record record: records) { + final JacksonEvent event = JacksonEvent.fromEvent(record.getData()); + eventConsumer.accept(new Record<>(event)); + } + } + +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index 16f596c989..5428f80152 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.ByteString; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.common.v1.AnyValue; @@ -19,6 +20,9 @@ import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.proto.resource.v1.Resource; import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; import io.opentelemetry.proto.trace.v1.ResourceSpans; @@ -26,6 +30,7 @@ import io.opentelemetry.proto.trace.v1.Status; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.log.JacksonOtelLog; import org.opensearch.dataprepper.model.log.OpenTelemetryLog; import org.opensearch.dataprepper.model.metric.Bucket; @@ -34,6 +39,12 @@ import org.opensearch.dataprepper.model.metric.DefaultQuantile; import org.opensearch.dataprepper.model.metric.Exemplar; import org.opensearch.dataprepper.model.metric.Quantile; +import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonSummary; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.trace.DefaultLink; import org.opensearch.dataprepper.model.trace.DefaultSpanEvent; import org.slf4j.Logger; @@ -57,6 +68,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -67,6 +79,7 @@ public class OTelProtoCodec { private static final Logger LOG = LoggerFactory.getLogger(OTelProtoCodec.class); + public static final int DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE = 10; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; @@ -147,6 +160,7 @@ public static long timeISO8601ToNanos(final String timeISO08601) { } public static class OTelProtoDecoder { + public List parseExportTraceServiceRequest(final ExportTraceServiceRequest exportTraceServiceRequest) { return exportTraceServiceRequest.getResourceSpansList().stream() .flatMap(rs -> parseResourceSpans(rs).stream()).collect(Collectors.toList()); @@ -445,6 +459,268 @@ protected Optional getServiceName(final Resource resource) { && !keyValue.getValue().getStringValue().isEmpty() ).findFirst().map(i -> i.getValue().getStringValue()); } + + public Collection> parseExportMetricsServiceRequest( + final ExportMetricsServiceRequest request, + AtomicInteger droppedCounter, + final Integer exponentialHistogramMaxAllowedScale, + final boolean calculateHistogramBuckets, + final boolean calculateExponentialHistogramBuckets, + final boolean flattenAttributes) { + Collection> recordsOut = new ArrayList<>(); + for (ResourceMetrics rs : request.getResourceMetricsList()) { + final String schemaUrl = rs.getSchemaUrl(); + final Map resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource()); + final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null); + + for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) { + final Map ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary()); + recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); + } + + for (ScopeMetrics sm : rs.getScopeMetricsList()) { + final Map ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope()); + recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); + } + } + return recordsOut; + } + + private List> processMetricsList( + final List metricsList, + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + AtomicInteger droppedCounter, + final Integer exponentialHistogramMaxAllowedScale, + final boolean calculateHistogramBuckets, + final boolean calculateExponentialHistogramBuckets, + final boolean flattenAttributes) { + List> recordsOut = new ArrayList<>(); + for (io.opentelemetry.proto.metrics.v1.Metric metric : metricsList) { + try { + if (metric.hasGauge()) { + recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + } else if (metric.hasSum()) { + recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + } else if (metric.hasSummary()) { + recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + } else if (metric.hasHistogram()) { + recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, calculateHistogramBuckets, flattenAttributes)); + } else if (metric.hasExponentialHistogram()) { + recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, exponentialHistogramMaxAllowedScale, calculateExponentialHistogramBuckets, flattenAttributes)); + } + } catch (Exception e) { + LOG.warn("Error while processing metrics", e); + droppedCounter.incrementAndGet(); + } + } + return recordsOut; + } + + private List> mapGauge( + io.opentelemetry.proto.metrics.v1.Metric metric, + String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final boolean flattenAttributes) { + return metric.getGauge().getDataPointsList().stream() + .map(dp -> JacksonGauge.builder() + .withUnit(metric.getUnit()) + .withName(metric.getName()) + .withDescription(metric.getDescription()) + .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp)) + .withTime(OTelProtoCodec.getTimeISO8601(dp)) + .withServiceName(serviceName) + .withValue(OTelProtoCodec.getValueAsDouble(dp)) + .withAttributes(OTelProtoCodec.mergeAllAttributes( + Arrays.asList( + OTelProtoCodec.convertKeysOfDataPointAttributes(dp), + resourceAttributes, + ils + ) + )) + .withSchemaUrl(schemaUrl) + .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withFlags(dp.getFlags()) + .build(flattenAttributes)) + .map(Record::new) + .collect(Collectors.toList()); + } + + private List> mapSum( + final io.opentelemetry.proto.metrics.v1.Metric metric, + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final boolean flattenAttributes) { + return metric.getSum().getDataPointsList().stream() + .map(dp -> JacksonSum.builder() + .withUnit(metric.getUnit()) + .withName(metric.getName()) + .withDescription(metric.getDescription()) + .withStartTime(OTelProtoCodec.getStartTimeISO8601(dp)) + .withTime(OTelProtoCodec.getTimeISO8601(dp)) + .withServiceName(serviceName) + .withIsMonotonic(metric.getSum().getIsMonotonic()) + .withValue(OTelProtoCodec.getValueAsDouble(dp)) + .withAggregationTemporality(metric.getSum().getAggregationTemporality().toString()) + .withAttributes(OTelProtoCodec.mergeAllAttributes( + Arrays.asList( + OTelProtoCodec.convertKeysOfDataPointAttributes(dp), + resourceAttributes, + ils + ) + )) + .withSchemaUrl(schemaUrl) + .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withFlags(dp.getFlags()) + .build(flattenAttributes)) + .map(Record::new) + .collect(Collectors.toList()); + } + + private List> mapSummary( + final io.opentelemetry.proto.metrics.v1.Metric metric, + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final boolean flattenAttributes) { + return metric.getSummary().getDataPointsList().stream() + .map(dp -> JacksonSummary.builder() + .withUnit(metric.getUnit()) + .withName(metric.getName()) + .withDescription(metric.getDescription()) + .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) + .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) + .withServiceName(serviceName) + .withCount(dp.getCount()) + .withSum(dp.getSum()) + .withQuantiles(OTelProtoCodec.getQuantileValues(dp.getQuantileValuesList())) + .withQuantilesValueCount(dp.getQuantileValuesCount()) + .withAttributes(OTelProtoCodec.mergeAllAttributes( + Arrays.asList( + OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), + resourceAttributes, + ils + ) + )) + .withSchemaUrl(schemaUrl) + .withFlags(dp.getFlags()) + .build(flattenAttributes)) + .map(Record::new) + .collect(Collectors.toList()); + } + + private List> mapHistogram( + final io.opentelemetry.proto.metrics.v1.Metric metric, + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final boolean calculateHistogramBuckets, + final boolean flattenAttributes) { + return metric.getHistogram().getDataPointsList().stream() + .map(dp -> { + JacksonHistogram.Builder builder = JacksonHistogram.builder() + .withUnit(metric.getUnit()) + .withName(metric.getName()) + .withDescription(metric.getDescription()) + .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) + .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) + .withServiceName(serviceName) + .withSum(dp.getSum()) + .withCount(dp.getCount()) + .withBucketCount(dp.getBucketCountsCount()) + .withExplicitBoundsCount(dp.getExplicitBoundsCount()) + .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString()) + .withBucketCountsList(dp.getBucketCountsList()) + .withExplicitBoundsList(dp.getExplicitBoundsList()) + .withAttributes(OTelProtoCodec.mergeAllAttributes( + Arrays.asList( + OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), + resourceAttributes, + ils + ) + )) + .withSchemaUrl(schemaUrl) + .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withFlags(dp.getFlags()); + if (calculateHistogramBuckets) { + builder.withBuckets(OTelProtoCodec.createBuckets(dp.getBucketCountsList(), dp.getExplicitBoundsList())); + } + JacksonHistogram jh = builder.build(flattenAttributes); + return jh; + + }) + .map(Record::new) + .collect(Collectors.toList()); + } + + private List> mapExponentialHistogram( + final io.opentelemetry.proto.metrics.v1.Metric metric, + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final Integer exponentialHistogramMaxAllowedScale, + final boolean calculateExponentialHistogramBuckets, + final boolean flattenAttributes) { + return metric.getExponentialHistogram() + .getDataPointsList() + .stream() + .filter(dp -> { + if (calculateExponentialHistogramBuckets && + exponentialHistogramMaxAllowedScale < Math.abs(dp.getScale())){ + LOG.error("Exponential histogram can not be processed since its scale of {} is bigger than the configured max of {}.", dp.getScale(), exponentialHistogramMaxAllowedScale); + return false; + } else { + return true; + } + }) + .map(dp -> { + JacksonExponentialHistogram.Builder builder = JacksonExponentialHistogram.builder() + .withUnit(metric.getUnit()) + .withName(metric.getName()) + .withDescription(metric.getDescription()) + .withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getStartTimeUnixNano())) + .withTime(OTelProtoCodec.convertUnixNanosToISO8601(dp.getTimeUnixNano())) + .withServiceName(serviceName) + .withSum(dp.getSum()) + .withCount(dp.getCount()) + .withZeroCount(dp.getZeroCount()) + .withScale(dp.getScale()) + .withPositive(dp.getPositive().getBucketCountsList()) + .withPositiveOffset(dp.getPositive().getOffset()) + .withNegative(dp.getNegative().getBucketCountsList()) + .withNegativeOffset(dp.getNegative().getOffset()) + .withAggregationTemporality(metric.getHistogram().getAggregationTemporality().toString()) + .withAttributes(OTelProtoCodec.mergeAllAttributes( + Arrays.asList( + OTelProtoCodec.unpackKeyValueList(dp.getAttributesList()), + resourceAttributes, + ils + ) + )) + .withSchemaUrl(schemaUrl) + .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withFlags(dp.getFlags()); + + if (calculateExponentialHistogramBuckets) { + builder.withPositiveBuckets(OTelProtoCodec.createExponentialBuckets(dp.getPositive(), dp.getScale())); + builder.withNegativeBuckets(OTelProtoCodec.createExponentialBuckets(dp.getNegative(), dp.getScale())); + } + + return builder.build(flattenAttributes); + }) + .map(Record::new) + .collect(Collectors.toList()); + } + } public static class OTelProtoEncoder { From b9d3fb05429711c9571b42c6b3558823990326e9 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 8 Nov 2023 21:09:55 +0000 Subject: [PATCH 2/2] Added tests and fixed test failures Signed-off-by: Krishna Kondaka --- .../otelmetrics/OTelMetricsRawProcessor.java | 4 +- .../otelmetrics/MetricsPluginSumTest.java | 15 ++-- .../otelmetrics/MetricsPluginSummaryTest.java | 8 +- .../otelmetrics/OTelMetricsGrpcService.java | 4 - .../source/otelmetrics/OTelMetricsSource.java | 1 - .../plugins/otel/codec/OTelMetricDecoder.java | 1 - .../plugins/otel/codec/OTelProtoCodec.java | 2 +- .../otel/codec/OTelProtoCodecTest.java | 85 ++++++++++++++++++- .../test/resources/test-gauge-metrics.json | 44 ++++++++++ .../resources/test-histogram-metrics.json | 50 +++++++++++ .../src/test/resources/test-sum-metrics.json | 45 ++++++++++ 11 files changed, 240 insertions(+), 19 deletions(-) create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java index 3e8f0874a8..679eef3224 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java @@ -83,15 +83,15 @@ public Collection> doExecute(Collection> reco AtomicInteger droppedCounter = new AtomicInteger(0); for (Record rec : records) { - Record newRecord = (Record)rec; if ((rec.getData() instanceof Event)) { + Record newRecord = (Record)rec; if (otelMetricsRawProcessorConfig.getFlattenAttributesFlag() || !otelMetricsRawProcessorConfig.getCalculateHistogramBuckets() || !otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) { modifyRecord(newRecord, otelMetricsRawProcessorConfig.getFlattenAttributesFlag(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()); } + recordsOut.add(newRecord); } - recordsOut.add(newRecord); if (!(rec.getData() instanceof ExportMetricsServiceRequest)) { continue; diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java index e202219ae1..9c6341f5da 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSumTest.java @@ -24,11 +24,12 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.JacksonMetric; import java.util.Arrays; import java.util.Collections; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -102,8 +103,9 @@ public void test() throws JsonProcessingException { Record record = new Record<>(exportMetricRequest); - List> rec = (List>) rawProcessor.doExecute(Arrays.asList(record)); - Record firstRecord = rec.get(0); + Collection> records = Arrays.asList((Record)record); + List> outputRecords = (List>)rawProcessor.doExecute(records); + Record firstRecord = (Record)outputRecords.get(0); ObjectMapper objectMapper = new ObjectMapper(); Map map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class); @@ -182,8 +184,11 @@ public void missingNameInvalidMetricTest() throws JsonProcessingException { Record record = new Record<>(exportMetricRequest); Record invalidRecord = new Record<>(exportMetricRequestWithInvalidMetric); - List> rec = (List>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord)); - org.hamcrest.MatcherAssert.assertThat(rec.size(), equalTo(1)); + Collection> records = Arrays.asList((Record)record, invalidRecord); + List> outputRecords = (List>)rawProcessor.doExecute(records); + + //List> rec = (List>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord)); + org.hamcrest.MatcherAssert.assertThat(outputRecords.size(), equalTo(1)); } private void assertSumProcessing(Map map) { diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java index d915a48e37..234765e740 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/MetricsPluginSummaryTest.java @@ -21,11 +21,12 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.JacksonMetric; import java.util.Arrays; import java.util.Collections; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -83,8 +84,9 @@ public void testSummaryProcessing() throws JsonProcessingException { Record record = new Record<>(exportMetricRequest); - List> rec = (List>) rawProcessor.doExecute(Arrays.asList(record)); - Record firstRecord = rec.get(0); + Collection> records = Arrays.asList((Record)record); + List> outputRecords = (List>)rawProcessor.doExecute(records); + Record firstRecord = (Record)outputRecords.get(0); ObjectMapper objectMapper = new ObjectMapper(); Map map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 1efc4a7549..0177a57584 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -19,7 +19,6 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,16 +37,13 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp private final Counter successRequestsCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; - private final ByteDecoder byteDecoder; public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, Buffer> buffer, - final ByteDecoder byteDecoder, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; this.buffer = buffer; - this.byteDecoder = byteDecoder; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 08c7432d98..33c4023e67 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -101,7 +101,6 @@ public void start(Buffer> buffer) { final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), buffer, - byteDecoder, pluginMetrics ); diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java index 6f3f3a2b6e..bdb51cada1 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java @@ -12,7 +12,6 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.Metric; -import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index 5428f80152..dba17b0851 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -663,7 +663,7 @@ private List> mapHistogram( private List> mapExponentialHistogram( final io.opentelemetry.proto.metrics.v1.Metric metric, - final String serviceName, + final String serviceName, final Map ils, final Map resourceAttributes, final String schemaUrl, diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java index afc4cf2ab3..6c9a167ad5 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java @@ -12,6 +12,7 @@ import com.google.protobuf.util.JsonFormat; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.ArrayValue; import io.opentelemetry.proto.common.v1.InstrumentationLibrary; @@ -34,6 +35,12 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.log.OpenTelemetryLog; import org.opensearch.dataprepper.model.metric.Bucket; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.trace.DefaultLink; import org.opensearch.dataprepper.model.trace.DefaultSpanEvent; import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields; @@ -56,12 +63,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.entry; @@ -84,9 +93,10 @@ public class OTelProtoCodecTest { private static final String TEST_REQUEST_BOTH_SPAN_TYPES_JSON_FILE = "test-request-both-span-types.json"; private static final String TEST_REQUEST_NO_SPANS_JSON_FILE = "test-request-no-spans.json"; private static final String TEST_SPAN_EVENT_JSON_FILE = "test-span-event.json"; - + private static final String TEST_REQUEST_GAUGE_METRICS_JSON_FILE = "test-gauge-metrics.json"; + private static final String TEST_REQUEST_SUM_METRICS_JSON_FILE = "test-sum-metrics.json"; + private static final String TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE = "test-histogram-metrics.json"; private static final String TEST_REQUEST_LOGS_JSON_FILE = "test-request-log.json"; - private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json"; @@ -124,6 +134,12 @@ private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(Strin return builder.build(); } + private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + private String getFileAsJsonString(String requestJsonFileName) throws IOException { final StringBuilder jsonBuilder = new StringBuilder(); try (final InputStream inputStream = Objects.requireNonNull( @@ -460,6 +476,71 @@ public void testParseExportLogsServiceRequest_InstrumentationLibrarySpans() thro validateSpans(spans); } + @Test + public void testParseExportMetricsServiceRequest_Guage() throws IOException { + final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_GAUGE_METRICS_JSON_FILE); + AtomicInteger droppedCount = new AtomicInteger(0); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + + validateGaugeMetricRequest(metrics); + } + + @Test + public void testParseExportMetricsServiceRequest_Sum() throws IOException { + final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_SUM_METRICS_JSON_FILE); + AtomicInteger droppedCount = new AtomicInteger(0); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + validateSumMetricRequest(metrics); + } + + @Test + public void testParseExportMetricsServiceRequest_Histogram() throws IOException { + final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE); + AtomicInteger droppedCount = new AtomicInteger(0); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + validateHistogramMetricRequest(metrics); + } + + private void validateGaugeMetricRequest(Collection> metrics) { + assertThat(metrics.size(), equalTo(1)); + Record record = ((List>)metrics).get(0); + JacksonMetric metric = (JacksonMetric) record.getData(); + assertThat(metric.getKind(), equalTo(Metric.KIND.GAUGE.toString())); + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("counter-int")); + JacksonGauge gauge = (JacksonGauge)metric; + assertThat(gauge.getValue(), equalTo(123.0)); + } + + private void validateSumMetricRequest(Collection> metrics) { + assertThat(metrics.size(), equalTo(1)); + Record record = ((List>)metrics).get(0); + JacksonMetric metric = (JacksonMetric) record.getData(); + assertThat(metric.getKind(), equalTo(Metric.KIND.SUM.toString())); + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("sum-int")); + JacksonSum sum = (JacksonSum)metric; + assertThat(sum.getValue(), equalTo(456.0)); + } + + private void validateHistogramMetricRequest(Collection> metrics) { + assertThat(metrics.size(), equalTo(1)); + Record record = ((List>)metrics).get(0); + JacksonMetric metric = (JacksonMetric) record.getData(); + assertThat(metric.getKind(), equalTo(Metric.KIND.HISTOGRAM.toString())); + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("histogram-int")); + JacksonHistogram histogram = (JacksonHistogram)metric; + assertThat(histogram.getSum(), equalTo(100.0)); + assertThat(histogram.getCount(), equalTo(30L)); + assertThat(histogram.getExemplars(), equalTo(Collections.emptyList())); + assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0))); + assertThat(histogram.getExplicitBoundsCount(), equalTo(4)); + assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L))); + assertThat(histogram.getBucketCount(), equalTo(5)); + assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); + } + } @Nested diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json new file mode 100644 index 0000000000..abca7b0b29 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-gauge-metrics.json @@ -0,0 +1,44 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "counter-int", + "unit": 1, + "gauge": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "123" + } + ] + } + } + ] + } + ] + } + ] +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json new file mode 100644 index 0000000000..1220de6214 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-histogram-metrics.json @@ -0,0 +1,50 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "histogram-int", + "unit": 1, + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "count": "30", + "sum": "100", + "bucket_counts": [3, 5, 15, 6, 1], + "explicit_bounds": [1.0, 2.0, 3.0, 4.0], + "exemplars": [] + } + ], + "aggregationTemporality":"2" + } + } + ] + } + ] + } + ] +} + diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json new file mode 100644 index 0000000000..97d3560cc6 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-sum-metrics.json @@ -0,0 +1,45 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "sum-int", + "unit": 1, + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "456" + } + ] + } + } + ] + } + ] + } + ] +} +