From 7c0ba96565adcba27729787aa9849299b908197f Mon Sep 17 00:00:00 2001 From: Andrey Rusev Date: Mon, 30 Sep 2024 15:54:04 +0300 Subject: [PATCH 1/2] Sparate OTLP message flattening to allow reuse --- collector-embedded/pom.xml | 10 + .../server/collector/LogsCollector.java | 147 +++--- .../server/collector/LogsFlattener.java | 220 +++++++++ .../server/collector/MetricDataPoint.java | 10 +- .../server/collector/MetricsCollector.java | 246 +--------- .../server/collector/MetricsFlattener.java | 430 ++++++++++++++++++ .../server/collector/ProfilesCollector.java | 234 ++++------ .../server/collector/ProfilesFlattener.java | 348 ++++++++++++++ .../server/collector/TracesCollector.java | 160 +++---- .../server/collector/TracesFlattener.java | 219 +++++++++ .../server/collector/LogsFlattenerTests.java | 230 ++++++++++ mod_sun_checks.xml | 3 +- pom.xml | 13 + 13 files changed, 1713 insertions(+), 557 deletions(-) create mode 100644 collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java create mode 100644 collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java create mode 100644 collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java create mode 100644 collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java create mode 100644 collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/LogsFlattenerTests.java diff --git a/collector-embedded/pom.xml b/collector-embedded/pom.xml index 7648b57..736bdcc 100644 --- a/collector-embedded/pom.xml +++ b/collector-embedded/pom.xml @@ -90,6 +90,10 @@ org.apache.maven.plugins maven-checkstyle-plugin + + org.apache.maven.plugins + maven-surefire-plugin + @@ -114,6 +118,12 @@ io.opentelemetry.proto opentelemetry-proto + + + org.junit.jupiter + junit-jupiter-engine + test + diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsCollector.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsCollector.java index 0007d1e..7d1fb3b 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsCollector.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsCollector.java @@ -17,7 +17,6 @@ package io.mishmash.opentelemetry.server.collector; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,9 +28,6 @@ import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; -import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.ResourceLogs; -import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.vertx.core.Vertx; /** @@ -94,94 +90,73 @@ public Batch loadBatch( Batch batch = new Batch<>(otelContext); - long timestamp = System.currentTimeMillis(); - String uuid = UUID.randomUUID().toString(); - int requestItems = 0; - for (ResourceLogs logs : request.getResourceLogsList()) { - for (ScopeLogs scopeLogs : logs.getScopeLogsList()) { - for (LogRecord log : scopeLogs.getLogRecordsList()) { - if (batch.isCancelled()) { - return batch; - } + for (Log l : new LogsFlattener( + batch, + Context.current(), + request, + Vertx.currentContext().get(VCTX_EMITTER))) { + if (batch.isCancelled()) { + return batch; + } + + /* + * FIXME: check if is valid and add an error message + * (but still allow it to go to subscribers) + */ - Span recordSpan = getInstrumentation() - .startNewSpan("otel.record"); + requestItems++; + l.addAll(getSubscribers()); + l.setLoaded(); + batch.add(l); - Log l = new Log(batch, - Context.current(), - Vertx.currentContext().get(VCTX_EMITTER)); - l.setFrom( - timestamp, - uuid, - requestItems++, - logs, - scopeLogs, - log); - - /* - * FIXME: check if is valid and add an error message - * (but still allow it to go to subscribers) - */ - - l.addAll(getSubscribers()); - l.setLoaded(); - batch.add(l); - - recordSpan.addEvent("Request item loaded"); - - int estimatedLag = offer( - l, - (subscriber, droppedItem) -> { - /* - * set an error on this in the response, - * FIXME: use another exception class - */ - droppedItem.completeExceptionally( - new RuntimeException( - "Logs collector subscriber " - + subscriber - + " dropped a log record")); - - // droppedItem.complete(subscriber); - // do not retry - return false; - }); - - if (estimatedLag < 0) { - // says how many subscribers dropped the message - LOG.info( - String.format( - "Logs batch %s has %d drop(s)", - uuid, - (-estimatedLag))); - addDroppedRequestItems( - (-estimatedLag), - transport, - encoding); - } else if (estimatedLag == 0) { - // there were no subscribers, set an error - batch.setLoadFailed( - new IllegalStateException(""" - Logs collector currently has \ - no subscribers""")); - LOG.log(Level.SEVERE, """ - Logs batch load failed, logs collector \ - currently has no subscribers. \ - Batch id: """ - + uuid); - - return batch; - // } else { + int estimatedLag = offer( + l, + (subscriber, droppedItem) -> { /* - * positive number is the estimated lag - number - * of items submitted but not yet consumed + * set an error on this in the response, + * FIXME: use another exception class */ - - // LOG.info("Logs estimated lag: " + estimatedLag); - } - } + droppedItem.completeExceptionally( + new RuntimeException( + "Logs collector subscriber " + + subscriber + + " dropped a log record")); + + // droppedItem.complete(subscriber); + // do not retry + return false; + }); + + if (estimatedLag < 0) { + // says how many subscribers dropped the message + LOG.info( + String.format( + "Logs batch has %d drop(s)", + (-estimatedLag))); + addDroppedRequestItems( + (-estimatedLag), + transport, + encoding); + } else if (estimatedLag == 0) { + // there were no subscribers, set an error + batch.setLoadFailed( + new IllegalStateException(""" + Logs collector currently has \ + no subscribers""")); + LOG.log(Level.SEVERE, """ + Logs batch load failed, logs collector \ + currently has no subscribers. """); + + return batch; + // } else { + /* + * positive number is the estimated lag - number + * of items submitted but not yet consumed + */ + + // LOG.info("Logs estimated lag: " + estimatedLag); } } diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java new file mode 100644 index 0000000..c785e39 --- /dev/null +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java @@ -0,0 +1,220 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.server.collector; + +import java.util.Iterator; +import java.util.UUID; + +import io.opentelemetry.context.Context; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import io.vertx.ext.auth.User; + +/** + * Extracts individual log messages from an OTLP packet. + * + * Turns an {@link ExportLogsServiceRequest} into + * an {@link Iterable} of {@link Log}s. + * + * The original OTLP protocol message format, as specified + * by {@link ExportLogsServiceRequest}, contains lists of + * individual {@link LogRecord}s. These lists are nested + * into lists of {@link ScopeLogs}, which are in turn + * nested inside a list of {@link ResourceLogs}. + * + * To facilitate further processing this class extracts + * (or 'flattens') the above nested structures into a 'flat' + * {@link Iterable} of individual {@link Log} instances. + * + * To see examples of 'flattened' data visit + * @see + * OpenTelemetry Basics Notebook on GitHub. + */ +public class LogsFlattener implements Iterable { + + /** + * The parent {@link Batch}. + */ + private Batch batch; + /** + * The own telemetry {@link Context}. + */ + private Context otel; + /** + * The OTLP message received from the client. + */ + private ExportLogsServiceRequest request; + /** + * An optional {@link User} if authentication was enabled. + */ + private User user; + /** + * The timestamp of this batch. + */ + private long timestamp; + /** + * The batch UUID. + */ + private String uuid; + + /** + * Create a {@link Log}s flattener. + * + * @param parentBatch the parent {@link Batch} + * @param otelContext the own telemetry {@link Context} + * @param logsRequest the OTLP packet + * @param authUser the {@link User} submitting the request or null + * if authentication was not enabled + */ + public LogsFlattener( + final Batch parentBatch, + final Context otelContext, + final ExportLogsServiceRequest logsRequest, + final User authUser) { + this.batch = parentBatch; + this.otel = otelContext; + this.request = logsRequest; + this.user = authUser; + + timestamp = System.currentTimeMillis(); + uuid = UUID.randomUUID().toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return new LogsIterator(); + } + + /** + * The internal {@link Iterator}, as returned to user. + */ + private final class LogsIterator implements Iterator { + + /** + * Iterator over the OTLP Resource Logs. + */ + private Iterator resourceIt; + /** + * Iterator over the OTLP Scope Logs within an OTLP Resource Log. + */ + private Iterator scopeIt; + /** + * Iterator over individual OTLP Log Records within a scope. + */ + private Iterator logIt; + /** + * The current OTLP Resource Logs. + */ + private ResourceLogs currentResource; + /** + * The current OTLP Scope Logs. + */ + private ScopeLogs currentScope; + /** + * A counter of returned {@link Log}s. + */ + private int itemsCount; + + /** + * Initialize this iterator. + */ + private LogsIterator() { + itemsCount = 0; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + if (logIt == null || !logIt.hasNext()) { + nextScope(); + + while (currentScope != null + && !( + logIt = currentScope + .getLogRecordsList() + .iterator() + ).hasNext()) { + nextScope(); + } + } + + return logIt != null && logIt.hasNext(); + } + + /** + * {@inheritDoc} + */ + @Override + public Log next() { + LogRecord rec = logIt.next(); + Log log = new Log(batch, otel, user); + + log.setFrom( + timestamp, + uuid, + itemsCount++, + currentResource, + currentScope, + rec); + + return log; + } + + /** + * Move the nested iterators to next OTLP Scope Logs. + */ + private void nextScope() { + if (scopeIt == null || !scopeIt.hasNext()) { + nextResource(); + + while (currentResource != null + && !( + scopeIt = currentResource + .getScopeLogsList() + .iterator() + ).hasNext()) { + nextResource(); + } + } + + currentScope = scopeIt == null || !scopeIt.hasNext() + ? null + : scopeIt.next(); + } + + /** + * Move the nested iterators to next OTLP Resource Logs. + */ + private void nextResource() { + if (resourceIt == null) { + resourceIt = request.getResourceLogsList().iterator(); + } + + currentResource = resourceIt.hasNext() + ? resourceIt.next() + : null; + } + } +} diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricDataPoint.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricDataPoint.java index 854dccd..a97d9d0 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricDataPoint.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricDataPoint.java @@ -162,7 +162,7 @@ public MetricDataPoint( * * @param batchTS the batch timestamp * @param batchID the batch id - * @param sequenceNum the sequence number of this span + * @param sequenceNum the sequence number of this metric * @param resourceMetric used to fill-in OpenTelemetry Resource details * @param scopeMetric used to fill-in OpenTelemetry Scope details * @param metric the metric details @@ -208,7 +208,7 @@ public void setFrom( * * @param batchTS the batch timestamp * @param batchID the batch id - * @param sequenceNum the sequence number of this span + * @param sequenceNum the sequence number of this metric * @param resourceMetric used to fill-in OpenTelemetry Resource details * @param scopeMetric used to fill-in OpenTelemetry Scope details * @param metric the metric details @@ -252,7 +252,7 @@ public void setFrom( * * @param batchTS the batch timestamp * @param batchID the batch id - * @param sequenceNum the sequence number of this span + * @param sequenceNum the sequence number of this metric * @param resourceMetric used to fill-in OpenTelemetry Resource details * @param scopeMetric used to fill-in OpenTelemetry Scope details * @param metric the metric details @@ -298,7 +298,7 @@ public void setFrom( * * @param batchTS the batch timestamp * @param batchID the batch id - * @param sequenceNum the sequence number of this span + * @param sequenceNum the sequence number of this metric * @param resourceMetric used to fill-in OpenTelemetry Resource details * @param scopeMetric used to fill-in OpenTelemetry Scope details * @param metric the metric details @@ -344,7 +344,7 @@ public void setFrom( * * @param batchTS the batch timestamp * @param batchID the batch id - * @param sequenceNum the sequence number of this span + * @param sequenceNum the sequence number of this metric * @param resourceMetric used to fill-in OpenTelemetry Resource details * @param scopeMetric used to fill-in OpenTelemetry Scope details * @param metric the metric details diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsCollector.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsCollector.java index 5d98b2f..76bcd16 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsCollector.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsCollector.java @@ -18,7 +18,6 @@ package io.mishmash.opentelemetry.server.collector; import java.util.List; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Flow.Subscriber; import java.util.logging.Level; @@ -31,17 +30,6 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; -import io.opentelemetry.proto.metrics.v1.ExponentialHistogram; -import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; -import io.opentelemetry.proto.metrics.v1.Gauge; -import io.opentelemetry.proto.metrics.v1.Histogram; -import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import io.opentelemetry.proto.metrics.v1.ResourceMetrics; -import io.opentelemetry.proto.metrics.v1.ScopeMetrics; -import io.opentelemetry.proto.metrics.v1.Sum; -import io.opentelemetry.proto.metrics.v1.Summary; -import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; import io.vertx.core.Vertx; /** @@ -105,217 +93,27 @@ public Batch loadBatch( Batch batch = new Batch<>(otelContext); - long timestamp = System.currentTimeMillis(); - String uuid = UUID.randomUUID().toString(); - - int seqNo = 0; int requestItems = 0; - for (ResourceMetrics metrics : request.getResourceMetricsList()) { - for (ScopeMetrics scopeMetrics - : metrics.getScopeMetricsList()) { - for (io.opentelemetry.proto.metrics.v1.Metric metric - : scopeMetrics.getMetricsList()) { - int dpSeqNo = 0; - - Span recordSpan = getInstrumentation() - .startNewSpan("otel.record"); - - switch (metric.getDataCase()) { - case DATA_NOT_SET: - /* - * FIXME: what to do when a metric has no - * data points? - */ - continue; - - case EXPONENTIAL_HISTOGRAM: - ExponentialHistogram exponentialHistogram = - metric.getExponentialHistogram(); - - for (ExponentialHistogramDataPoint dp - : exponentialHistogram - .getDataPointsList()) { - if (batch.isCancelled()) { - return batch; - } - - MetricDataPoint m = new MetricDataPoint( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - m.setFrom(timestamp, - uuid, - seqNo, - metrics, - scopeMetrics, - metric, - exponentialHistogram, - dpSeqNo++, - dp); - - if (!offerDataPoint( - batch, - m, - uuid, - transport, - encoding)) { - return batch; - } - } - - break; - case GAUGE: - Gauge gauge = metric.getGauge(); - - for (NumberDataPoint dp - : gauge.getDataPointsList()) { - if (batch.isCancelled()) { - return batch; - } - - MetricDataPoint m = new MetricDataPoint( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - m.setFrom(timestamp, - uuid, - seqNo, - metrics, - scopeMetrics, - metric, - gauge, - dpSeqNo++, - dp); - - if (!offerDataPoint( - batch, - m, - uuid, - transport, - encoding)) { - return batch; - } - } - - break; - case HISTOGRAM: - Histogram histogram = metric.getHistogram(); - - for (HistogramDataPoint dp - : histogram.getDataPointsList()) { - if (batch.isCancelled()) { - return batch; - } - - MetricDataPoint m = new MetricDataPoint( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - m.setFrom(timestamp, - uuid, - seqNo, - metrics, - scopeMetrics, - metric, - histogram, - dpSeqNo++, - dp); - - if (!offerDataPoint( - batch, - m, - uuid, - transport, - encoding)) { - return batch; - } - } - - break; - case SUM: - Sum sum = metric.getSum(); - - for (NumberDataPoint dp - : sum.getDataPointsList()) { - if (batch.isCancelled()) { - return batch; - } - - MetricDataPoint m = new MetricDataPoint( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - m.setFrom(timestamp, - uuid, - seqNo, - metrics, - scopeMetrics, - metric, - sum, - dpSeqNo++, - dp); - - if (!offerDataPoint(batch, - m, - uuid, - transport, - encoding)) { - return batch; - } - } - - break; - case SUMMARY: - Summary summary = metric.getSummary(); - - for (SummaryDataPoint dp - : summary.getDataPointsList()) { - if (batch.isCancelled()) { - return batch; - } - - MetricDataPoint m = new MetricDataPoint( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - m.setFrom(timestamp, - uuid, - seqNo, - metrics, - scopeMetrics, - metric, - summary, - dpSeqNo++, - dp); - - if (!offerDataPoint( - batch, - m, - uuid, - transport, - encoding)) { - return batch; - } - } - - break; - default: - // FIXME: unknown type! - break; - } - - requestItems += dpSeqNo; - seqNo++; - - recordSpan.addEvent("Request item loaded"); - } + for (MetricDataPoint m : new MetricsFlattener( + batch, + Context.current(), + request, + Vertx.currentContext() + .get(VCTX_EMITTER))) { + if (batch.isCancelled()) { + return batch; + } + + if (!offerDataPoint( + batch, + m, + transport, + encoding)) { + return batch; } + + requestItems++; } batch.setLoaded(); @@ -331,7 +129,6 @@ public Batch loadBatch( * * @param batch the batch * @param m the data point - * @param uuid id of the batch * @param transport OTLP transport used * @param encoding OTLP transport encoding used * @return true if successful @@ -339,7 +136,6 @@ public Batch loadBatch( protected boolean offerDataPoint( final Batch batch, final MetricDataPoint m, - final String uuid, final String transport, final String encoding) { /* @@ -371,8 +167,7 @@ protected boolean offerDataPoint( // it tells how many subscribers dropped the message LOG.info( String.format( - "Metrics batch %s has %d drop(s)", - uuid, + "Metrics batch has %d drop(s)", (-estimatedLag))); addDroppedRequestItems((-estimatedLag), transport, encoding); } else if (estimatedLag == 0) { @@ -382,8 +177,7 @@ protected boolean offerDataPoint( "Metrics collector currently has no subscribers")); LOG.log(Level.SEVERE, """ Metrics batch load failed, metrics collector currently \ - has no subscribers. Batch id: """ - + uuid); + has no subscribers. Batch id: """); return false; // } else { diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java new file mode 100644 index 0000000..a52b647 --- /dev/null +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java @@ -0,0 +1,430 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.mishmash.opentelemetry.server.collector; + +import java.util.Iterator; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.opentelemetry.context.Context; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; +import io.vertx.ext.auth.User; + +/** + * Extracts individual metric data points from an OTLP packet. + * + * Turns an {@link ExportMetricsServiceRequest} into + * an {@link Iterable} of {@link MetricDataPoint}s. + * + * The original OTLP protocol message format, as specified + * by {@link ExportMetricsServiceRequest}, contains lists of + * individual metric data points. These lists are nested + * into lists of {@link Metric}s, which are in turn + * nested inside a list of {@link ScopeMetrics}, and scopes + * are again nested into a list of {@link ResourceMetrics}. + * + * To facilitate further processing this class extracts + * (or 'flattens') the above nested structures into a 'flat' + * {@link Iterable} of individual {@link MetricDataPoint} instances. + * + * To see examples of 'flattened' data visit + * @see + * OpenTelemetry Basics Notebook on GitHub. + */ +public class MetricsFlattener implements Iterable { + + /** + * The Logger to use. + */ + private static final Logger LOG = Logger + .getLogger(MetricsFlattener.class.getName()); + /** + * The parent {@link Batch}. + */ + private Batch batch; + /** + * The own telemetry {@link Context}. + */ + private Context otel; + /** + * The OTLP message received from the client. + */ + private ExportMetricsServiceRequest request; + /** + * An optional {@link User} if authentication was enabled. + */ + private User user; + /** + * The timestamp of this batch. + */ + private long timestamp; + /** + * The batch UUID. + */ + private String uuid; + + /** + * Create a {@link MetricDataPoint}s flattener. + * + * @param parentBatch the parent {@link Batch} + * @param otelContext the own telemetry {@link Context} + * @param metricsRequest the OTLP packet + * @param authUser the {@link User} submitting the request or null + * if authentication was not enabled + */ + public MetricsFlattener( + final Batch parentBatch, + final Context otelContext, + final ExportMetricsServiceRequest metricsRequest, + final User authUser) { + this.batch = parentBatch; + this.otel = otelContext; + this.request = metricsRequest; + this.user = authUser; + + timestamp = System.currentTimeMillis(); + uuid = UUID.randomUUID().toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return new DataPointsIterator(); + } + + /** + * The internal {@link Iterator}, as returned to user. + */ + private final class DataPointsIterator + implements Iterator { + + /** + * Iterator over the OTLP Resource Metrics. + */ + private Iterator resourceIt; + /** + * Iterator over the OTLP Scope Metrics within an OTLP Resource Metrics. + */ + private Iterator scopeIt; + /** + * Iterator over the OTLP Metrics within a scope. + */ + private Iterator metricIt; + /** + * Iterator over individual OTLP Exponential Histogram data points. + */ + private Iterator expHistIt; + /** + * Iterator over individual OTLP Gauge or Sum data points. + */ + private Iterator numIt; + /** + * Iterator over individual OTLP Histogram data points. + */ + private Iterator histIt; + /** + * Iterator over individual OTLP Summary data points. + */ + private Iterator summaryIt; + /** + * The current OTLP Resource Metrics. + */ + private ResourceMetrics currentResource; + /** + * The current OTLP Scope Metrics. + */ + private ScopeMetrics currentScope; + /** + * The current OTLP Metric. + */ + private Metric currentMetric; + /** + * A counter of processed metrics. + */ + private int metricsCount; + /** + * A counter of returned {@link MetricDataPoint}s. + */ + private int itemsCount; + + /** + * Initialize this iterator. + */ + private DataPointsIterator() { + metricsCount = 0; + itemsCount = 0; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + if (currentMetric == null) { + nextMetric(); + + while (currentMetric != null + && !initDataPointIterator()) { + nextMetric(); + } + } + + return currentMetric != null + && hasNextDataPoint(); + } + + /** + * {@inheritDoc} + */ + @Override + public MetricDataPoint next() { + MetricDataPoint dataPoint = new MetricDataPoint(batch, otel, user); + + switch (currentMetric.getDataCase()) { + case EXPONENTIAL_HISTOGRAM: + dataPoint.setFrom( + timestamp, + uuid, + metricsCount, + currentResource, + currentScope, + currentMetric, + currentMetric.getExponentialHistogram(), + itemsCount++, + expHistIt.next()); + break; + case GAUGE: + dataPoint.setFrom( + timestamp, + uuid, + metricsCount, + currentResource, + currentScope, + currentMetric, + currentMetric.getGauge(), + itemsCount++, + numIt.next()); + case HISTOGRAM: + dataPoint.setFrom( + timestamp, + uuid, + metricsCount, + currentResource, + currentScope, + currentMetric, + currentMetric.getHistogram(), + itemsCount++, + histIt.next()); + case SUM: + dataPoint.setFrom( + timestamp, + uuid, + metricsCount, + currentResource, + currentScope, + currentMetric, + currentMetric.getSum(), + itemsCount++, + numIt.next()); + case SUMMARY: + dataPoint.setFrom( + timestamp, + uuid, + metricsCount, + currentResource, + currentScope, + currentMetric, + currentMetric.getSummary(), + itemsCount++, + summaryIt.next()); + default: + throw new UnsupportedOperationException( + String.format( + "Unexpected OTLP metric of type: '%s'", + currentMetric.getDataCase())); + } + + return dataPoint; + } + + /** + * Check if the current metric has remaining data points. + * + * @return true when there are more data points in the + * current metric. + */ + private boolean hasNextDataPoint() { + switch (currentMetric.getDataCase()) { + case DATA_NOT_SET: + return false; + case EXPONENTIAL_HISTOGRAM: + return expHistIt != null && expHistIt.hasNext(); + case GAUGE: + case SUM: + return numIt != null && numIt.hasNext(); + case HISTOGRAM: + return histIt != null && histIt.hasNext(); + case SUMMARY: + return summaryIt != null && summaryIt.hasNext(); + default: + throw new UnsupportedOperationException( + String.format( + "Unknown OTLP metric type: '%s'", + currentMetric.getDataCase())); + } + } + + /** + * Initialize the data point iterator that corresponds + * to the current metric type. + * + * Call only once after a new metric has been obtained. + * + * @return true if the new metric has data points + */ + private boolean initDataPointIterator() { + boolean res; + + switch (currentMetric.getDataCase()) { + case DATA_NOT_SET: + // the metric type is missing, not expecting data here + LOG.log(Level.WARNING, + "Received an OTLP metric without a type, skipping"); + expHistIt = null; + numIt = null; + histIt = null; + summaryIt = null; + + return false; + case EXPONENTIAL_HISTOGRAM: + expHistIt = currentMetric + .getExponentialHistogram() + .getDataPointsList() + .iterator(); + res = expHistIt.hasNext(); + break; + case GAUGE: + numIt = currentMetric + .getGauge() + .getDataPointsList() + .iterator(); + res = numIt.hasNext(); + break; + case HISTOGRAM: + histIt = currentMetric + .getHistogram() + .getDataPointsList() + .iterator(); + res = histIt.hasNext(); + break; + case SUM: + numIt = currentMetric + .getSum() + .getDataPointsList() + .iterator(); + res = numIt.hasNext(); + break; + case SUMMARY: + summaryIt = currentMetric + .getSummary() + .getDataPointsList() + .iterator(); + res = summaryIt.hasNext(); + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unknown OTLP metric type: '%s'", + currentMetric.getDataCase())); + } + + if (res) { + itemsCount = 0; + metricsCount++; + } + + return res; + } + + /** + * Move the nested iterators to next OTLP Metric. + */ + private void nextMetric() { + if (metricIt == null || !metricIt.hasNext()) { + nextScope(); + + while (currentScope != null + && !( + metricIt = currentScope + .getMetricsList() + .iterator() + ).hasNext()) { + nextScope(); + } + } + + currentMetric = metricIt == null || !metricIt.hasNext() + ? null + : metricIt.next(); + } + + /** + * Move the nested iterators to next OTLP Scope Metrics. + */ + private void nextScope() { + if (scopeIt == null || !scopeIt.hasNext()) { + nextResource(); + + while (currentResource != null + && !( + scopeIt = currentResource + .getScopeMetricsList() + .iterator() + ).hasNext()) { + nextResource(); + } + } + + currentScope = scopeIt == null || !scopeIt.hasNext() + ? null + : scopeIt.next(); + } + + /** + * Move the nested iterators to next OTLP Resource Metrics. + */ + private void nextResource() { + if (resourceIt == null) { + resourceIt = request.getResourceMetricsList().iterator(); + } + + currentResource = resourceIt.hasNext() + ? resourceIt.next() + : null; + } + } +} diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesCollector.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesCollector.java index f674d59..9296b78 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesCollector.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesCollector.java @@ -17,7 +17,6 @@ package io.mishmash.opentelemetry.server.collector; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,11 +28,6 @@ import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesPartialSuccess; import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceRequest; import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceResponse; -import io.opentelemetry.proto.profiles.v1experimental.Profile; -import io.opentelemetry.proto.profiles.v1experimental.ProfileContainer; -import io.opentelemetry.proto.profiles.v1experimental.ResourceProfiles; -import io.opentelemetry.proto.profiles.v1experimental.Sample; -import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles; import io.vertx.core.Vertx; /** @@ -97,150 +91,96 @@ protected Batch loadBatch( Batch batch = new Batch<>(otelContext); - long timestamp = System.currentTimeMillis(); - String uuid = UUID.randomUUID().toString(); - int requestItems = 0; - for (int rpidx = 0; - rpidx < request.getResourceProfilesCount(); - rpidx++) { - ResourceProfiles resourceProfile = - request.getResourceProfiles(rpidx); - - for (int spidx = 0; - spidx < resourceProfile.getScopeProfilesCount(); - spidx++) { - ScopeProfiles scope = - resourceProfile.getScopeProfiles(spidx); - - for (int pcidx = 0; - pcidx < scope.getProfilesCount(); - pcidx++) { - ProfileContainer container = scope.getProfiles(pcidx); - Profile prof = container.getProfile(); - - for (int sidx = 0; - sidx < prof.getSampleCount(); - sidx++) { - Sample sample = prof.getSample(sidx); - - for (int vidx = 0; - vidx < sample.getValueCount(); - vidx++) { - if (batch.isCancelled()) { - return batch; - } - - Span recordSpan = getInstrumentation() - .startNewSpan("otel.record"); - - ProfileSampleValue lv = - new ProfileSampleValue( - batch, - Context.current(), - Vertx.currentContext() - .get(VCTX_EMITTER)); - lv.setFrom( - timestamp, - uuid, - rpidx, - resourceProfile, - spidx, - scope, - pcidx, - container, - prof, - sidx, - vidx); - - requestItems++; - - /* - * FIXME: check if is valid and add an - * error message (but still allow it to - * go to subscribers) - */ - - lv.addAll(getSubscribers()); - lv.setLoaded(); - batch.add(lv); - - recordSpan.addEvent("Request item loaded"); - - int estimatedLag = offer( - lv, - (subscriber, droppedItem) -> { - /* - * set an error on this in the - * response, - * FIXME: use another - * exception class - */ - droppedItem - .completeExceptionally( - new RuntimeException( - """ - Profiles \ - collector \ - subscriber """ - + subscriber - + """ - dropped a log \ - record""")); - - // droppedItem - // .complete(subscriber); - // do not retry - return false; - }); - - if (estimatedLag < 0) { - /* - * says how many subscribers dropped - * the message - */ - LOG.info( - String.format(""" - Profiles batch %s has \ - %d drop(s)""", - uuid, - (-estimatedLag))); - addDroppedRequestItems( - (-estimatedLag), - transport, - encoding); - } else if (estimatedLag == 0) { - /* - * there were no subscribers, - * set an error - */ - batch.setLoadFailed( - new IllegalStateException(""" - Profiles collector \ - currently has \ - no subscribers""")); - LOG.log(Level.SEVERE, """ - Profiles batch load failed, \ - profiles collector \ - currently has no subscribers. \ - Batch id: """ - + uuid); - - return batch; - // } else { - /* - * positive number is the estimated - * lag - number of items submitted - * but not yet consumed - */ + for (ProfileSampleValue lv : new ProfilesFlattener( + batch, + Context.current(), + request, + Vertx.currentContext() + .get(VCTX_EMITTER))) { + if (batch.isCancelled()) { + return batch; + } - // LOG.info("Logs estimated lag: " - // + estimatedLag); - } - } - } - } + requestItems++; + + /* + * FIXME: check if is valid and add an + * error message (but still allow it to + * go to subscribers) + */ + + lv.addAll(getSubscribers()); + lv.setLoaded(); + batch.add(lv); + + int estimatedLag = offer( + lv, + (subscriber, droppedItem) -> { + /* + * set an error on this in the + * response, + * FIXME: use another + * exception class + */ + droppedItem + .completeExceptionally( + new RuntimeException( + """ + Profiles \ + collector \ + subscriber """ + + subscriber + + """ + dropped a log \ + record""")); + + // droppedItem + // .complete(subscriber); + // do not retry + return false; + }); + + if (estimatedLag < 0) { + /* + * says how many subscribers dropped + * the message + */ + LOG.info( + String.format(""" + Profiles batch has %d drop(s)""", + (-estimatedLag))); + addDroppedRequestItems( + (-estimatedLag), + transport, + encoding); + } else if (estimatedLag == 0) { + /* + * there were no subscribers, + * set an error + */ + batch.setLoadFailed( + new IllegalStateException(""" + Profiles collector \ + currently has \ + no subscribers""")); + LOG.log(Level.SEVERE, """ + Profiles batch load failed, \ + profiles collector \ + currently has no subscribers. \ + Batch id: """); + + return batch; + // } else { + /* + * positive number is the estimated + * lag - number of items submitted + * but not yet consumed + */ + + // LOG.info("Logs estimated lag: " + // + estimatedLag); } } diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java new file mode 100644 index 0000000..7318914 --- /dev/null +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java @@ -0,0 +1,348 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.server.collector; + +import java.util.Iterator; +import java.util.UUID; + +import io.opentelemetry.context.Context; +import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceRequest; +import io.opentelemetry.proto.profiles.v1experimental.ProfileContainer; +import io.opentelemetry.proto.profiles.v1experimental.ResourceProfiles; +import io.opentelemetry.proto.profiles.v1experimental.Sample; +import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles; +import io.vertx.ext.auth.User; + +/** + * Extracts individual profile sample value messages from an OTLP packet. + * + * Turns an {@link ExportProfilesServiceRequest} into + * an {@link Iterable} of {@link ProfileSampleValue}s. + * + * The original OTLP protocol message format, as specified + * by {@link ExportProfilesServiceRequest}, contains lists of + * individual values (of type long). These lists are nested + * into lists of {@link Sample}s, which are in turn + * nested inside a list of {@link ProfileContainer}s, which are + * again nested into a list of {@link ScopeProfiles}, further + * nested into a list of {@link ResourceProfiles}. + * + * In addition, repetitive fields within the same message are + * encoded with the help of lookup tables (or dictionaries) to reduce the + * message size. + * + * To facilitate further processing this class extracts + * (or 'flattens') the above nested structures into a 'flat' + * {@link Iterable} of individual {@link ProfileSampleValue} instances. + * It also performs the necessary lookup to extract the actual values + * of dictionary-encoded fields. + * + * To see examples of 'flattened' data visit + * @see + * OpenTelemetry Basics Notebook on GitHub. + */ +public class ProfilesFlattener implements Iterable { + + /** + * The parent {@link Batch}. + */ + private Batch batch; + /** + * The own telemetry {@link Context}. + */ + private Context otel; + /** + * The OTLP message received from the client. + */ + private ExportProfilesServiceRequest request; + /** + * An optional {@link User} if authentication was enabled. + */ + private User user; + /** + * The timestamp of this batch. + */ + private long timestamp; + /** + * The batch UUID. + */ + private String uuid; + + /** + * Create a {@link ProfileSampleValue}s flattener. + * + * @param parentBatch the parent {@link Batch} + * @param otelContext the own telemetry {@link Context} + * @param profilesRequest the OTLP packet + * @param authUser the {@link User} submitting the request or null + * if authentication was not enabled + */ + public ProfilesFlattener( + final Batch parentBatch, + final Context otelContext, + final ExportProfilesServiceRequest profilesRequest, + final User authUser) { + this.batch = parentBatch; + this.otel = otelContext; + this.request = profilesRequest; + this.user = authUser; + + timestamp = System.currentTimeMillis(); + uuid = UUID.randomUUID().toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return new ProfilesIterator(); + } + + /** + * The internal {@link Iterator}, as returned to user. + */ + private final class ProfilesIterator + implements Iterator { + + /** + * Iterator over the OTLP Resource Profiles. + */ + private Iterator resourceIt; + /** + * Iterator over the OTLP Scope Profiles within an OTLP + * Resource Profile. + */ + private Iterator scopeIt; + /** + * Iterator over the OTLP Profile containers within an OTLP + * Scope Profile. + */ + private Iterator profileIt; + /** + * Iterator over the OTLP Sample within an OTLP Profile. + */ + private Iterator sampleIt; + /** + * Iterator over the values of an OTLP Sample. + */ + private Iterator valueIt; + /** + * The current OTLP Resource Profile. + */ + private ResourceProfiles currentResource; + /** + * The current OTLP Scope Profiles. + */ + private ScopeProfiles currentScope; + /** + * The current OTLP Profile Container. + */ + private ProfileContainer currentProfile; + /** + * The current OTLP Sample. + */ + private Sample currentSample; + /** + * A counter of Resource Profiles. + */ + private int resourcesCount; + /** + * A counter of Scope Profiles. + */ + private int scopesCount; + /** + * A counter of Profiles. + */ + private int profilesCount; + /** + * A counter of Samples. + */ + private int samplesCount; + /** + * A counter of values. + */ + private int valuesCount; + /** + * A counter of returned {@link ProfileSampleValue}s. + */ + private int itemsCount; + + /** + * Initialize this iterator. + */ + private ProfilesIterator() { + resourcesCount = -1; + scopesCount = -1; + profilesCount = -1; + samplesCount = -1; + valuesCount = -1; + itemsCount = 0; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + if (valueIt == null || !valueIt.hasNext()) { + nextSample(); + + while (currentSample != null + && !( + valueIt = currentSample + .getValueList() + .iterator() + ).hasNext()) { + nextSample(); + } + } + + return valueIt != null && valueIt.hasNext(); + } + + /** + * {@inheritDoc} + */ + @Override + public ProfileSampleValue next() { + Long rec = valueIt.next(); + ProfileSampleValue psv = + new ProfileSampleValue(batch, otel, user); + + psv.setFrom( + timestamp, + uuid, + resourcesCount, + currentResource, + scopesCount, + currentScope, + profilesCount, + currentProfile, + currentProfile.getProfile(), + samplesCount, + ++valuesCount); + + itemsCount++; + + return psv; + } + + /** + * Move the nested iterators to next OTLP Sample. + */ + private void nextSample() { + if (sampleIt == null || !sampleIt.hasNext()) { + nextProfile(); + + while (currentProfile != null + && !( + sampleIt = currentProfile + .getProfile() + .getSampleList() + .iterator() + ).hasNext()) { + nextProfile(); + } + } + + if (sampleIt == null || !sampleIt.hasNext()) { + currentSample = null; + } else { + currentSample = sampleIt.next(); + samplesCount++; + valuesCount = -1; + } + } + + /** + * Move the nested iterators to next OTLP Profile Container. + */ + private void nextProfile() { + if (profileIt == null || !profileIt.hasNext()) { + nextScope(); + + while (currentScope != null + && !( + profileIt = currentScope + .getProfilesList() + .iterator() + ).hasNext()) { + nextScope(); + } + } + + if (profileIt == null || !profileIt.hasNext()) { + currentProfile = null; + } else { + currentProfile = profileIt.next(); + profilesCount++; + samplesCount = -1; + valuesCount = -1; + } + } + + /** + * Move the nested iterators to next OTLP Scope Profiles. + */ + private void nextScope() { + if (scopeIt == null || !scopeIt.hasNext()) { + nextResource(); + + while (currentResource != null + && !( + scopeIt = currentResource + .getScopeProfilesList() + .iterator() + ).hasNext()) { + nextResource(); + } + } + + if (scopeIt == null || !scopeIt.hasNext()) { + currentScope = null; + } else { + currentScope = scopeIt.next(); + scopesCount++; + profilesCount = -1; + samplesCount = -1; + valuesCount = -1; + } + } + + /** + * Move the nested iterators to next OTLP Resource Profiles. + */ + private void nextResource() { + if (resourceIt == null) { + resourceIt = request.getResourceProfilesList().iterator(); + } + + if (resourceIt.hasNext()) { + currentResource = resourceIt.next(); + resourcesCount++; + scopesCount = -1; + profilesCount = -1; + samplesCount = -1; + valuesCount = -1; + } else { + currentResource = null; + } + } + } +} diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesCollector.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesCollector.java index 195a2a3..341c00c 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesCollector.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesCollector.java @@ -17,7 +17,6 @@ package io.mishmash.opentelemetry.server.collector; -import java.util.UUID; import java.util.concurrent.ForkJoinPool; import java.util.logging.Level; import java.util.logging.Logger; @@ -28,8 +27,6 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; -import io.opentelemetry.proto.trace.v1.ResourceSpans; -import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.vertx.core.Vertx; /** @@ -93,99 +90,78 @@ public Batch loadBatch( Batch batch = new Batch<>(otelContext); - long timestamp = System.currentTimeMillis(); - String uuid = UUID.randomUUID().toString(); - int requestItems = 0; - for (ResourceSpans spans : request.getResourceSpansList()) { - for (ScopeSpans scope : spans.getScopeSpansList()) { - for (io.opentelemetry.proto.trace.v1.Span span - : scope.getSpansList()) { - if (batch.isCancelled()) { - return batch; - } - - io.opentelemetry.api.trace.Span recordOtelSpan = - getInstrumentation() - .startNewSpan("otel.record"); - - Span s = new Span(batch, - otelContext, - Vertx.currentContext().get(VCTX_EMITTER)); - s.setFrom( - timestamp, - uuid, - requestItems++, - spans, - scope, - span); - - /* - * FIXME: check if is valid and add an error message - * (but still allow it to go to subscribers - */ - - s.addAll(getSubscribers()); - s.setLoaded(); - batch.add(s); - - recordOtelSpan.addEvent("Request item loaded"); - - int estimatedLag = offer( - s, - (subscriber, droppedItem) -> { - /* - * set an error on this in the response, - * FIXME: use another exception class - */ - droppedItem.completeExceptionally( - new RuntimeException( - "Traces collector subscriber " - + subscriber - + " dropped a span record") - ); - - // do not retry - return false; - }); - - if (estimatedLag < 0) { - // says how many subscribers dropped the message - LOG.info( - String.format( - "Traces batch %s has %d drop(s)", - uuid, - (-estimatedLag))); - - addDroppedRequestItems( - (-estimatedLag), - transport, - encoding); - } else if (estimatedLag == 0) { - // there were no subscribers, set an error - batch.setLoadFailed( - new IllegalStateException(""" - Traces collector currently has \ - no subscribers""")); - - LOG.log(Level.SEVERE, - """ - Traces batch load failed, traces \ - collector currently has no \ - subscribers. Batch id: """ - + uuid); - - return batch; - // } else { + for (Span s : new TracesFlattener( + batch, + Context.current(), + request, + Vertx.currentContext().get(VCTX_EMITTER))) { + if (batch.isCancelled()) { + return batch; + } + + /* + * FIXME: check if is valid and add an error message + * (but still allow it to go to subscribers + */ + + s.addAll(getSubscribers()); + s.setLoaded(); + batch.add(s); + + requestItems++; + + int estimatedLag = offer( + s, + (subscriber, droppedItem) -> { /* - * positive number is the estimated lag - number - * of items submitted but not yet consumed + * set an error on this in the response, + * FIXME: use another exception class */ - - // LOG.info("Traces estimated lag: " + estimatedLag); - } - } + droppedItem.completeExceptionally( + new RuntimeException( + "Traces collector subscriber " + + subscriber + + " dropped a span record") + ); + + // do not retry + return false; + }); + + if (estimatedLag < 0) { + // says how many subscribers dropped the message + LOG.info( + String.format( + "Traces batch has %d drop(s)", + (-estimatedLag))); + + addDroppedRequestItems( + (-estimatedLag), + transport, + encoding); + } else if (estimatedLag == 0) { + // there were no subscribers, set an error + batch.setLoadFailed( + new IllegalStateException(""" + Traces collector currently has \ + no subscribers""")); + + LOG.log(Level.SEVERE, + """ + Traces batch load failed, traces \ + collector currently has no \ + subscribers. Batch id: """); + + return batch; + // } else { + /* + * positive number is the estimated lag - number + * of items submitted but not yet consumed + */ + + // LOG.info("Traces estimated lag: " + estimatedLag); } } diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java new file mode 100644 index 0000000..356f517 --- /dev/null +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java @@ -0,0 +1,219 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.server.collector; + +import java.util.Iterator; +import java.util.UUID; + +import io.opentelemetry.context.Context; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.vertx.ext.auth.User; + +/** + * Extracts individual spans from an OTLP packet. + * + * Turns an {@link ExportTraceServiceRequest} into + * an {@link Iterable} of {@link Span}s. + * + * The original OTLP protocol message format, as specified + * by {@link ExportTraceServiceRequest}, contains lists of + * individual {@link io.opentelemetry.proto.trace.v1.Span}s. + * These lists are nested into lists of {@link ScopeSpans}, + * which are in turn nested inside a list of {@link ResourceSpans}. + * + * To facilitate further processing this class extracts + * (or 'flattens') the above nested structures into a 'flat' + * {@link Iterable} of individual {@link Span} instances. + * + * To see examples of 'flattened' data visit + * @see + * OpenTelemetry Basics Notebook on GitHub. + */ +public class TracesFlattener implements Iterable { + + /** + * The parent {@link Batch}. + */ + private Batch batch; + /** + * The own telemetry {@link Context}. + */ + private Context otel; + /** + * The OTLP message received from the client. + */ + private ExportTraceServiceRequest request; + /** + * An optional {@link User} if authentication was enabled. + */ + private User user; + /** + * The timestamp of this batch. + */ + private long timestamp; + /** + * The batch UUID. + */ + private String uuid; + + /** + * Create a {@link Span}s flattener. + * + * @param parentBatch the parent {@link Batch} + * @param otelContext the own telemetry {@link Context} + * @param tracesRequest the OTLP packet + * @param authUser the {@link User} submitting the request or null + * if authentication was not enabled + */ + public TracesFlattener( + final Batch parentBatch, + final Context otelContext, + final ExportTraceServiceRequest tracesRequest, + final User authUser) { + this.batch = parentBatch; + this.otel = otelContext; + this.request = tracesRequest; + this.user = authUser; + + timestamp = System.currentTimeMillis(); + uuid = UUID.randomUUID().toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return new SpansIterator(); + } + + /** + * The internal {@link Iterator}, as returned to user. + */ + private final class SpansIterator implements Iterator { + + /** + * Iterator over the OTLP Resource Spans. + */ + private Iterator resourceIt; + /** + * Iterator over the OTLP Scope Spans within OTLP Resource Spans. + */ + private Iterator scopeIt; + /** + * Iterator over individual OTLP Spans within a scope. + */ + private Iterator spanIt; + /** + * The current OTLP Resource Spans. + */ + private ResourceSpans currentResource; + /** + * The current OTLP Scope Spans. + */ + private ScopeSpans currentScope; + /** + * A counter of returned {@link Span}s. + */ + private int itemsCount; + + /** + * Initialize this iterator. + */ + private SpansIterator() { + itemsCount = 0; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + if (spanIt == null || !spanIt.hasNext()) { + nextScope(); + + while (currentScope != null + && !( + spanIt = currentScope + .getSpansList() + .iterator() + ).hasNext()) { + nextScope(); + } + } + + return spanIt != null && spanIt.hasNext(); + } + + /** + * {@inheritDoc} + */ + @Override + public Span next() { + io.opentelemetry.proto.trace.v1.Span rec = spanIt.next(); + Span span = new Span(batch, otel, user); + + span.setFrom( + timestamp, + uuid, + itemsCount++, + currentResource, + currentScope, + rec); + + return span; + } + + /** + * Move the nested iterators to next OTLP Scope Spans. + */ + private void nextScope() { + if (scopeIt == null || !scopeIt.hasNext()) { + nextResource(); + + while (currentResource != null + && !( + scopeIt = currentResource + .getScopeSpansList() + .iterator() + ).hasNext()) { + nextResource(); + } + } + + currentScope = scopeIt == null || !scopeIt.hasNext() + ? null + : scopeIt.next(); + } + + /** + * Move the nested iterators to next OTLP Resource Spans. + */ + private void nextResource() { + if (resourceIt == null) { + resourceIt = request.getResourceSpansList().iterator(); + } + + currentResource = resourceIt.hasNext() + ? resourceIt.next() + : null; + } + } +} diff --git a/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/LogsFlattenerTests.java b/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/LogsFlattenerTests.java new file mode 100644 index 0000000..c8c600a --- /dev/null +++ b/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/LogsFlattenerTests.java @@ -0,0 +1,230 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.server.collector; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Iterator; + +import org.junit.jupiter.api.Test; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import io.opentelemetry.proto.resource.v1.Resource; + +public class LogsFlattenerTests { + + @Test + void emptyLogs() { + // test with completely empty request + LogsFlattener flattener = new LogsFlattener( + null, + null, + emptyRequest().build(), + null); + + assertFalse(flattener.iterator().hasNext()); + + // test with a request with an empty ResourceLogs + flattener = new LogsFlattener( + null, + null, + emptyResourceLogs() + .build(), + null); + + assertFalse(flattener.iterator().hasNext()); + + // test with a request with an empty ScopeLogs + flattener = new LogsFlattener( + null, + null, + emptyScopeLogs() + .build(), + null); + + assertFalse(flattener.iterator().hasNext()); + } + + @Test + void singleLog() { + // test with a single log record + LogsFlattener flattener = new LogsFlattener( + null, + null, + createLogsRequest( + null, + null, + LogRecord.newBuilder()) + .build(), + null); + Iterator it = flattener.iterator(); + + assertTrue(it.hasNext()); + assertNotNull(it.next()); + assertFalse(it.hasNext()); + + // test with a single record and then an empty resource + flattener = new LogsFlattener( + null, + null, + addEmptyResourceLogs( + createLogsRequest( + null, + null, + LogRecord.newBuilder()) + ).build(), + null); + it = flattener.iterator(); + + assertTrue(it.hasNext()); + assertNotNull(it.next()); + assertFalse(it.hasNext()); + + // test with a single record and then an empty scope + flattener = new LogsFlattener( + null, + null, + addEmptyScopeLogs( + createLogsRequest( + null, + null, + LogRecord.newBuilder()) + ).build(), + null); + it = flattener.iterator(); + + assertTrue(it.hasNext()); + assertNotNull(it.next()); + assertFalse(it.hasNext()); + + // test with a single record and then an empty resource and then an empty scope + flattener = new LogsFlattener( + null, + null, + addEmptyScopeLogs( + addEmptyResourceLogs( + createLogsRequest( + null, + null, + LogRecord.newBuilder())) + ).build(), + null); + it = flattener.iterator(); + + assertTrue(it.hasNext()); + assertNotNull(it.next()); + assertFalse(it.hasNext()); + + } + + private ExportLogsServiceRequest.Builder emptyRequest() { + return createLogsRequest(); + } + + private ExportLogsServiceRequest.Builder emptyResourceLogs() { + return addEmptyResourceLogs(createLogsRequest()); + } + + private ExportLogsServiceRequest.Builder addEmptyResourceLogs( + ExportLogsServiceRequest.Builder builder) { + return builder.addResourceLogs(createResourceLogs(null, null)); + } + + private ExportLogsServiceRequest.Builder emptyScopeLogs() { + return addEmptyScopeLogs(createLogsRequest()); + } + + private ExportLogsServiceRequest.Builder addEmptyScopeLogs( + ExportLogsServiceRequest.Builder builder) { + return builder.addResourceLogs( + addEmptyScopeLogs(createResourceLogs(null, null))); + } + + private ResourceLogs.Builder addEmptyScopeLogs(ResourceLogs.Builder builder) { + return builder.addScopeLogs(ScopeLogs.newBuilder()); + } + + private ResourceLogs.Builder createResourceLogs( + String schemaUrl, + Integer droppedAttributesCnt, + KeyValue.Builder...attributes) { + ResourceLogs.Builder res = ResourceLogs.newBuilder(); + + if (schemaUrl != null) { + res = res.setSchemaUrl(schemaUrl); + } + + if (attributes == null && droppedAttributesCnt == null) { + return res; + } + + Resource.Builder rb = Resource.newBuilder(); + + if (droppedAttributesCnt != null) { + rb = rb.setDroppedAttributesCount(droppedAttributesCnt); + } + + if (attributes != null) { + rb = rb.addAllAttributes( + Arrays.stream(attributes) + .map(a -> a.build()) + .toList()); + } + + return res.setResource(rb); + } + + private ExportLogsServiceRequest.Builder createLogsRequest() { + return ExportLogsServiceRequest.newBuilder(); + } + + private ExportLogsServiceRequest.Builder createLogsRequest( + ResourceLogs.Builder resource, + ScopeLogs.Builder scope, + LogRecord.Builder...logs) { + if (resource == null) { + resource = createResourceLogs(null, null); + } + + return createLogsRequest() + .addResourceLogs(resource + .addScopeLogs( + addLogs(scope, logs + ))); + } + + private ScopeLogs.Builder addLogs( + ScopeLogs.Builder scope, + LogRecord.Builder...logs) { + if (scope == null) { + scope = ScopeLogs.newBuilder(); + } + + return scope.addAllLogRecords(Arrays.stream(logs) + .map(l -> l.build()) + .toList()); + } +} diff --git a/mod_sun_checks.xml b/mod_sun_checks.xml index 0962754..7412492 100644 --- a/mod_sun_checks.xml +++ b/mod_sun_checks.xml @@ -73,6 +73,7 @@ + @@ -168,7 +169,7 @@ - + diff --git a/pom.xml b/pom.xml index 8a50716..6a09a79 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ 4.5.10 1.42.1 1.3.2-alpha + 5.11.1 @@ -113,6 +114,13 @@ opentelemetry-proto ${opentelemetry.proto.version} + + + org.junit.jupiter + junit-jupiter-engine + ${junit5.version} + test + @@ -171,6 +179,11 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.0 + From a97e82553bdd006eed91981ed79e2df80b205b83 Mon Sep 17 00:00:00 2001 From: Andrey Rusev Date: Tue, 8 Oct 2024 16:41:20 +0300 Subject: [PATCH 2/2] Publish Apache Druid input source skeleton --- README.md | 19 +- .../server/collector/LogsFlattener.java | 54 +++++ .../server/collector/MetricsFlattener.java | 54 +++++ .../server/collector/ProfilesFlattener.java | 54 +++++ .../server/collector/TracesFlattener.java | 54 +++++ druid-input-format/README.md | 3 + druid-input-format/pom.xml | 119 +++++++++++ .../druid/format/BaseIterator.java | 92 ++++++++ .../druid/format/FlatIterator.java | 70 ++++++ .../druid/format/LogsReader.java | 165 ++++++++++++++ .../druid/format/MetricsReader.java | 169 +++++++++++++++ .../druid/format/OTLPInputFormat.java | 180 ++++++++++++++++ .../druid/format/OTLPInputFormatModule.java | 59 +++++ .../druid/format/ProfilesReader.java | 172 +++++++++++++++ .../druid/format/RawIterator.java | 76 +++++++ .../druid/format/TracesReader.java | 165 ++++++++++++++ .../druid/format/package-info.java | 48 +++++ ...rg.apache.druid.initialization.DruidModule | 15 ++ .../persistence/proto/ProtobufLogs.java | 13 ++ .../persistence/proto/ProtobufMetrics.java | 14 ++ .../persistence/proto/ProtobufProfiles.java | 13 ++ .../persistence/proto/ProtobufSpans.java | 14 ++ .../persistence/proto/ProtobufUtils.java | 202 ++++++++++++++++++ pom.xml | 1 + 24 files changed, 1824 insertions(+), 1 deletion(-) create mode 100644 druid-input-format/README.md create mode 100644 druid-input-format/pom.xml create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/BaseIterator.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/FlatIterator.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormatModule.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/RawIterator.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java create mode 100644 druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/package-info.java create mode 100644 druid-input-format/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule create mode 100644 persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java diff --git a/README.md b/README.md index c9dc795..d4dbd8f 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,24 @@ Parquet files as saved by this Stand-alone server. - [README](./server-parquet) - [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/server-parquet) +## Apache Druid Input Format + +Use this artifact when ingesting OpenTelemetry signals into [Apache Druid](https://druid.apache.org), in combination with an Input Source (like Apache Kafka or other). + +Apache Druid is a high performance, real-time analytics database that delivers sub-second queries on streaming and batch data at scale and under load. This makes it perfect for OpenTelemetry data analytics. + +With this OTLP Input Format you can build OpenTelemetry ingestion pipelines into Apache +Druid. For example: +- Use the [OpenTelemetry Kafka Exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/kafkaexporter/README.md) to publish +OTLP signals to an Apache Kafka topic, then the [Druid Kafka Ingestion](https://druid.apache.org/docs/latest/ingestion/kafka-ingestion/) with this Input Format to get Druid +tables with your telemetry. +- In a similar way you can also use other Druid input sources developed by mishmash io - +like with [Apache BookKeeper](https://bookkeeper.apache.org) or [Apache Pulsar](https://pulsar.apache.org). For details - check the related artifact documentation. + +Find out more about the OTLP Input Format for Apache Druid: +- [README](./druid-input-format) +- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/druid-input-format) + # OpenTelemetry at mishmash io OpenTelemetry's main intent is the observability of production environments, but at [mishmash io](https://mishmash.io) it is part of our software development process. By saving telemetry from **experiments** and **tests** of @@ -44,4 +62,3 @@ We believe that adopting OpenTelemetry as a software development tool might be u Learn more about the broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at [mishmash io](https://mishmash.io/) and `follow` [GitHub profile](https://github.com/mishmash-io) for updates and new releases. - diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java index c785e39..5ffcaf5 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/LogsFlattener.java @@ -97,6 +97,60 @@ public LogsFlattener( uuid = UUID.randomUUID().toString(); } + /** + * Get the configured Logs {@link Batch}, if any. + * + * @return the batch or null if not set + */ + public Batch getBatch() { + return batch; + } + + /** + * Get the own telemetry context, if any. + * + * @return the {@link Context} or null if not set + */ + public Context getOtel() { + return otel; + } + + /** + * Get the parsed protobuf Logs request. + * + * @return the {@link ExportLogsServiceRequest} message + */ + public ExportLogsServiceRequest getRequest() { + return request; + } + + /** + * Get the authenticated user who submitted this message. + * + * @return the {@link User} or null if authentication was not enabled + */ + public User getUser() { + return user; + } + + /** + * Get the timestamp used by this flattener. + * + * @return the timestamp in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Get the unique UUID used by this flattener. + * + * @return the UUID + */ + public String getUuid() { + return uuid; + } + /** * {@inheritDoc} */ diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java index a52b647..2a3c8da 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/MetricsFlattener.java @@ -108,6 +108,60 @@ public MetricsFlattener( uuid = UUID.randomUUID().toString(); } + /** + * Get the configured Metrics {@link Batch}, if any. + * + * @return the batch or null if not set + */ + public Batch getBatch() { + return batch; + } + + /** + * Get the own telemetry context, if any. + * + * @return the {@link Context} or null if not set + */ + public Context getOtel() { + return otel; + } + + /** + * Get the parsed protobuf Metrics request. + * + * @return the {@link ExportMetricsServiceRequest} message + */ + public ExportMetricsServiceRequest getRequest() { + return request; + } + + /** + * Get the authenticated user who submitted this message. + * + * @return the {@link User} or null if authentication was not enabled + */ + public User getUser() { + return user; + } + + /** + * Get the timestamp used by this flattener. + * + * @return the timestamp in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Get the unique UUID used by this flattener. + * + * @return the UUID + */ + public String getUuid() { + return uuid; + } + /** * {@inheritDoc} */ diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java index 7318914..710552c 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/ProfilesFlattener.java @@ -106,6 +106,60 @@ public ProfilesFlattener( uuid = UUID.randomUUID().toString(); } + /** + * Get the configured Profiles {@link Batch}, if any. + * + * @return the batch or null if not set + */ + public Batch getBatch() { + return batch; + } + + /** + * Get the own telemetry context, if any. + * + * @return the {@link Context} or null if not set + */ + public Context getOtel() { + return otel; + } + + /** + * Get the parsed protobuf Profiles request. + * + * @return the {@link ExportProfilesServiceRequest} message + */ + public ExportProfilesServiceRequest getRequest() { + return request; + } + + /** + * Get the authenticated user who submitted this message. + * + * @return the {@link User} or null if authentication was not enabled + */ + public User getUser() { + return user; + } + + /** + * Get the timestamp used by this flattener. + * + * @return the timestamp in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Get the unique UUID used by this flattener. + * + * @return the UUID + */ + public String getUuid() { + return uuid; + } + /** * {@inheritDoc} */ diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java index 356f517..f933976 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/TracesFlattener.java @@ -96,6 +96,60 @@ public TracesFlattener( uuid = UUID.randomUUID().toString(); } + /** + * Get the configured Spans {@link Batch}, if any. + * + * @return the batch or null if not set + */ + public Batch getBatch() { + return batch; + } + + /** + * Get the own telemetry context, if any. + * + * @return the {@link Context} or null if not set + */ + public Context getOtel() { + return otel; + } + + /** + * Get the parsed protobuf Traces request. + * + * @return the {@link ExportTraceServiceRequest} message + */ + public ExportTraceServiceRequest getRequest() { + return request; + } + + /** + * Get the authenticated user who submitted this message. + * + * @return the {@link User} or null if authentication was not enabled + */ + public User getUser() { + return user; + } + + /** + * Get the timestamp used by this flattener. + * + * @return the timestamp in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Get the unique UUID used by this flattener. + * + * @return the UUID + */ + public String getUuid() { + return uuid; + } + /** * {@inheritDoc} */ diff --git a/druid-input-format/README.md b/druid-input-format/README.md new file mode 100644 index 0000000..47ba099 --- /dev/null +++ b/druid-input-format/README.md @@ -0,0 +1,3 @@ +# Apache Druid Input Format for OpenTelemetry singals + +Coming soon! diff --git a/druid-input-format/pom.xml b/druid-input-format/pom.xml new file mode 100644 index 0000000..580bec4 --- /dev/null +++ b/druid-input-format/pom.xml @@ -0,0 +1,119 @@ + + + + 4.0.0 + + + io.mishmash.opentelemetry + opentelemetry-server-embedded + 1.1.3 + + + druid-input-format + jar + + An OpenTelemetry InputFormat for Apache Druid Data Sources + + An OpenTelemetry input format that can be used to build Apache Druid data ingestion jobs. + + https://mishmash.io/open_source/opentelemetry + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + mishmash io + https://mishmash.io + + + + scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git + scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git + https://github.com/mishmash-io/opentelemetry-server-embedded + + + + + Ivan Kountchev + i.kountchev@mishmash.io + mishmash io + https://mishmash.io + + developer + + + + Andrey Rusev + a.rusev@mishmash.io + www.linkedin.com/in/andrey-rusev-21894172 + mishmash io + https://mishmash.io + + architect + + + + + + 30.0.1 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + + + + + io.mishmash.opentelemetry + collector-embedded + ${project.version} + + + io.mishmash.opentelemetry + persistence-protobuf + ${project.version} + + + org.apache.druid + druid-processing + ${druid.version} + provided + + + + diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/BaseIterator.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/BaseIterator.java new file mode 100644 index 0000000..1e9c267 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/BaseIterator.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; + +/** + * A base implementation of {@link CloseableIteratorWithMetadata}. + * + * Allows metadata to be computed before {@link #next()} returns. + * + * @param the iterator element type + */ +public abstract class BaseIterator + implements CloseableIteratorWithMetadata { + + /** + * Holds the metadata. + */ + private Map meta; + + /** + * Construct a new {@link BaseIterator}. + */ + public BaseIterator() { + meta = new HashMap<>(); + } + + /** + * Build the metadata for the element. + * + * @param element the next element to be returned by this iterator + */ + public abstract void initMetaFor(T element); + + /** + * Set a metadata field. + * + * @param key the metadata field key + * @param value the value + */ + public void setMeta( + final String key, + final Object value) { + meta.put(key, value); + } + + /** + * Clear all metadata currently held by this iterator. + */ + public void clearMeta() { + meta.clear(); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + /* + * nothing to do, input is closed immediately + * when this object was instantiated. + */ + } + + /** + * {@inheritDoc} + */ + @Override + public Map currentMetadata() { + return meta; + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/FlatIterator.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/FlatIterator.java new file mode 100644 index 0000000..a26e9ba --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/FlatIterator.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.util.NoSuchElementException; + +/** + * A {@link BaseIterator} that iterates over a single element. + * + * @param the iterator element type + */ +public abstract class FlatIterator extends BaseIterator { + + /** + * The single value to be returned by this iterator. + */ + private T value; + /** + * A flag if the value was already returned or not. + */ + private boolean hasNext = true; + + /** + * Create a new {@link FlatIterator}. + * + * @param element the element that will be returned by this iterator + */ + public FlatIterator(final T element) { + value = element; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + return hasNext; + } + + /** + * {@inheritDoc} + */ + @Override + public T next() { + if (!hasNext) { + throw new NoSuchElementException(); + } + + hasNext = false; + + initMetaFor(value); + + return value; + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java new file mode 100644 index 0000000..4556b81 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java @@ -0,0 +1,165 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; +import org.apache.druid.java.util.common.parsers.ParseException; + +import io.mishmash.opentelemetry.persistence.proto.ProtobufLogs; +import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.server.collector.Log; +import io.mishmash.opentelemetry.server.collector.LogsFlattener; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; + +/** + * An {@link IntermediateRowParsingReader} for OpenTelemetry logs signals. + * + * Takes an {@link InputEntity} containing a 'raw' protobuf OTLP + * {@link ExportLogsServiceRequest} and 'flattens' it using + * {@link LogsFlattener}, or takes a single (already flattened) + * {@link PersistedLog} protobuf message. + */ +public class LogsReader extends IntermediateRowParsingReader { + + /** + * The source to read data from. + */ + private InputEntity source; + /** + * True if the 'raw' format was configured. + */ + private boolean isRaw = false; + + /** + * Create an OTLP logs reader. + * + * @param input the {@link InputEntity} containing protobuf-encoded bytes + * @param isRawFormat true if input contains a 'raw' + * {@link ExportLogsServiceRequest} + */ + public LogsReader( + final InputEntity input, + final boolean isRawFormat) { + this.source = input; + this.isRaw = isRawFormat; + } + + /** + * {@inheritDoc} + */ + @Override + protected List parseInputRows( + final PersistedLog intermediateRow) + throws IOException, ParseException { + // TODO Auto-generated method stub + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected List> toMap( + final PersistedLog intermediateRow) throws IOException { + return Collections.singletonList( + ProtobufLogs.toJsonMap(intermediateRow)); + } + + /** + * {@inheritDoc} + */ + @Override + protected CloseableIteratorWithMetadata + intermediateRowIteratorWithMetadata() throws IOException { + try (InputStream is = source().open()) { + if (isRaw) { + ExportLogsServiceRequest req = + ExportLogsServiceRequest.parseFrom(is); + + return new RawIterator<>( + new LogsFlattener(null, null, req, null)) { + @Override + public PersistedLog convert(final Log element) { + return ProtobufLogs.buildLog(element).build(); + } + + @Override + public void initMetaFor( + final PersistedLog element) { + initIteratorMeta(this, element); + } + }; + } else { + PersistedLog log = PersistedLog.parseFrom(is); + + return new FlatIterator<>(log) { + @Override + public void initMetaFor( + final PersistedLog element) { + initIteratorMeta(this, element); + } + }; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + protected String intermediateRowAsString(final PersistedLog row) { + return row.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + protected InputEntity source() { + return source; + } + + /** + * Initialize the metadata associated with the given log. + * + * A {@link CloseableIteratorWithMetadata} has to provide metadata + * for each element it returns through its next() method. This + * metadata is computed here, before next() returns. + * + * @param it - the iterator requesting metadata + * @param log - the new element to be returned by next() + */ + protected void initIteratorMeta( + final BaseIterator it, + final PersistedLog log) { + it.clearMeta(); + + it.setMeta("batchUUID", log.getBatchUUID()); + it.setMeta("batchTimestamp", log.getBatchTimestamp()); + it.setMeta("seqNo", log.getSeqNo()); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java new file mode 100644 index 0000000..4040e60 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java @@ -0,0 +1,169 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; +import org.apache.druid.java.util.common.parsers.ParseException; + +import io.mishmash.opentelemetry.persistence.proto.ProtobufMetrics; +import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; +import io.mishmash.opentelemetry.server.collector.MetricDataPoint; +import io.mishmash.opentelemetry.server.collector.MetricsFlattener; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; + +/** + * An {@link IntermediateRowParsingReader} for OpenTelemetry metrics signals. + * + * Takes an {@link InputEntity} containing a 'raw' protobuf OTLP + * {@link ExportMetricsServiceRequest} and 'flattens' it using + * {@link MetricsFlattener}, or takes a single (already flattened) + * {@link PersistedMetric} protobuf message. + */ +public class MetricsReader + extends IntermediateRowParsingReader { + + /** + * The source to read data from. + */ + private InputEntity source; + /** + * True if the 'raw' format was configured. + */ + private boolean isRaw = false; + + /** + * Create an OTLP metrics reader. + * + * @param input the {@link InputEntity} containing protobuf-encoded bytes + * @param isRawFormat true if input contains a 'raw' + * {@link ExportMetricsServiceRequest} + */ + public MetricsReader( + final InputEntity input, + final boolean isRawFormat) { + this.source = input; + this.isRaw = isRawFormat; + } + + /** + * {@inheritDoc} + */ + @Override + protected List parseInputRows( + final PersistedMetric intermediateRow) + throws IOException, ParseException { + // TODO Auto-generated method stub + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected List> toMap( + final PersistedMetric intermediateRow) throws IOException { + return Collections.singletonList( + ProtobufMetrics.toJsonMap(intermediateRow)); + } + + /** + * {@inheritDoc} + */ + @Override + protected CloseableIteratorWithMetadata + intermediateRowIteratorWithMetadata() throws IOException { + try (InputStream is = source().open()) { + if (isRaw) { + ExportMetricsServiceRequest req = + ExportMetricsServiceRequest.parseFrom(is); + + return new RawIterator<>( + new MetricsFlattener(null, null, req, null)) { + @Override + public PersistedMetric convert( + final MetricDataPoint element) { + return ProtobufMetrics + .buildMetricDataPoint(element).build(); + } + + @Override + public void initMetaFor( + final PersistedMetric element) { + initIteratorMeta(this, element); + } + }; + } else { + PersistedMetric m = PersistedMetric.parseFrom(is); + + return new FlatIterator<>(m) { + @Override + public void initMetaFor( + final PersistedMetric element) { + initIteratorMeta(this, element); + } + }; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + protected String intermediateRowAsString(final PersistedMetric row) { + return row.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + protected InputEntity source() { + return source; + } + + /** + * Initialize the metadata associated with the given metric data point. + * + * A {@link CloseableIteratorWithMetadata} has to provide metadata + * for each element it returns through its next() method. This + * metadata is computed here, before next() returns. + * + * @param it - the iterator requesting metadata + * @param metric - the new element to be returned by next() + */ + protected void initIteratorMeta( + final BaseIterator it, + final PersistedMetric metric) { + it.clearMeta(); + + it.setMeta("batchUUID", metric.getBatchUUID()); + it.setMeta("batchTimestamp", metric.getBatchTimestamp()); + it.setMeta("metricSeqNo", metric.getSeqNo()); + it.setMeta("dataPointSeqNo", metric.getDatapointSeqNo()); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java new file mode 100644 index 0000000..3cd0143 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java @@ -0,0 +1,180 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.File; +import java.util.Objects; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The main class of the OTLP Apache Druid OTLP {@link InputFormat}. + */ +public class OTLPInputFormat implements InputFormat { + + /** + * Specifies the format of the input data. + */ + public enum OTLPSignalType { + /** + * OTLP 'raw' logs. + */ + @JsonProperty("logsRaw") + LOGS_RAW, + /** + * OTLP 'flattened' log. + */ + @JsonProperty("logsFlat") + LOGS_FLAT, + /** + * OTLP 'raw' metrics. + */ + @JsonProperty("metricsRaw") + METRICS_RAW, + /** + * OTLP 'flattened' metric data point. + */ + @JsonProperty("metricsFlat") + METRICS_FLAT, + /** + * OTLP 'raw' traces. + */ + @JsonProperty("tracesRaw") + TRACES_RAW, + /** + * OTLP 'flattened' trace span. + */ + @JsonProperty("tracesFlat") + TRACES_FLAT, + /** + * OTLP 'raw' profiles. + */ + @JsonProperty("profilesRaw") + PROFILES_RAW, + /** + * OTLP 'flattened' profile sample value. + */ + @JsonProperty("profilesFlat") + PROFILES_FLAT + } + + /** + * The configured input signal data type. + */ + private OTLPSignalType inputSignal; + + /** + * Create a new {@link OTLPInputFormat} for a given + * signal type ({@link OTLPSignalType}). + * + * @param otlpSignalType the type of input data to be used + */ + @JsonCreator + public OTLPInputFormat( + @JsonProperty("otlpInputSignal") + final OTLPSignalType otlpSignalType) { + this.inputSignal = otlpSignalType; + } + + /** + * Get the configured {@link OTLPSignalType}. + * + * @return the type used by this {@link OTLPInputFormat} + */ + @JsonProperty + public OTLPSignalType getOtlpInputSignal() { + return inputSignal; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isSplittable() { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public InputEntityReader createReader( + final InputRowSchema inputRowSchema, + final InputEntity source, + final File temporaryDirectory) { + // use to filter out columns: + inputRowSchema.getColumnsFilter(); + inputRowSchema.getDimensionsSpec(); + inputRowSchema.getTimestampSpec(); + inputRowSchema.getMetricNames(); + + switch (getOtlpInputSignal()) { + case LOGS_FLAT: + return new LogsReader(source, false); + case LOGS_RAW: + return new LogsReader(source, true); + case METRICS_FLAT: + return new MetricsReader(source, false); + case METRICS_RAW: + return new MetricsReader(source, true); + case TRACES_FLAT: + return new TracesReader(source, false); + case TRACES_RAW: + return new TracesReader(source, true); + case PROFILES_FLAT: + return new ProfilesReader(source, false); + case PROFILES_RAW: + return new ProfilesReader(source, true); + default: + // should not happen + throw new UnsupportedOperationException("Internal error"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return Objects.hash(getOtlpInputSignal()); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + OTLPInputFormat o = (OTLPInputFormat) obj; + + return Objects.equals(getOtlpInputSignal(), o.getOtlpInputSignal()); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormatModule.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormatModule.java new file mode 100644 index 0000000..1e8ff41 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormatModule.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.util.Collections; +import java.util.List; + +import org.apache.druid.initialization.DruidModule; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; + +/** + * A {@link DruidModule} for the OTLP Input Format. + * + * Installs necessary modules, such as {@link OTLPInputFormat}. + */ +public class OTLPInputFormatModule implements DruidModule { + + /** + * {@inheritDoc} + */ + @Override + public void configure(final Binder binder) { + // nothing to do + } + + /** + * {@inheritDoc} + */ + @Override + public List getJacksonModules() { + return Collections.singletonList( + new SimpleModule() + .registerSubtypes( + new NamedType( + OTLPInputFormat.class, + "otlp") + ) + ); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java new file mode 100644 index 0000000..72a4a4e --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java @@ -0,0 +1,172 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; +import org.apache.druid.java.util.common.parsers.ParseException; + +import io.mishmash.opentelemetry.persistence.proto.ProtobufProfiles; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; +import io.mishmash.opentelemetry.server.collector.ProfileSampleValue; +import io.mishmash.opentelemetry.server.collector.ProfilesFlattener; +import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceRequest; + +/** + * An {@link IntermediateRowParsingReader} for OpenTelemetry profiles signals. + * + * Takes an {@link InputEntity} containing a 'raw' protobuf OTLP + * {@link ExportProfilesServiceRequest} and 'flattens' it using + * {@link ProfilesFlattener}, or takes a single (already flattened) + * {@link PersistedProfile} protobuf message. + */ +public class ProfilesReader + extends IntermediateRowParsingReader { + + /** + * The source to read data from. + */ + private InputEntity source; + /** + * True if the 'raw' format was configured. + */ + private boolean isRaw = false; + + /** + * Create an OTLP profiles reader. + * + * @param input the {@link InputEntity} containing protobuf-encoded bytes + * @param isRawFormat true if input contains a 'raw' + * {@link ExportProfilesServiceRequest} + */ + public ProfilesReader( + final InputEntity input, + final boolean isRawFormat) { + this.source = input; + this.isRaw = isRawFormat; + } + + /** + * {@inheritDoc} + */ + @Override + protected List parseInputRows( + final PersistedProfile intermediateRow) + throws IOException, ParseException { + // TODO Auto-generated method stub + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected List> toMap( + final PersistedProfile intermediateRow) throws IOException { + return Collections.singletonList( + ProtobufProfiles.toJsonMap(intermediateRow)); + } + + /** + * {@inheritDoc} + */ + @Override + protected CloseableIteratorWithMetadata + intermediateRowIteratorWithMetadata() throws IOException { + try (InputStream is = source().open()) { + if (isRaw) { + ExportProfilesServiceRequest req = + ExportProfilesServiceRequest.parseFrom(is); + + return new RawIterator<>( + new ProfilesFlattener(null, null, req, null)) { + @Override + public PersistedProfile convert( + final ProfileSampleValue element) { + return ProtobufProfiles + .buildProfileSampleValue(element).build(); + } + + @Override + public void initMetaFor( + final PersistedProfile element) { + initIteratorMeta(this, element); + } + }; + } else { + PersistedProfile p = PersistedProfile.parseFrom(is); + + return new FlatIterator<>(p) { + @Override + public void initMetaFor( + final PersistedProfile element) { + initIteratorMeta(this, element); + } + }; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + protected String intermediateRowAsString(final PersistedProfile row) { + return row.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + protected InputEntity source() { + return source; + } + + /** + * Initialize the metadata associated with the given profile. + * + * A {@link CloseableIteratorWithMetadata} has to provide metadata + * for each element it returns through its next() method. This + * metadata is computed here, before next() returns. + * + * @param it - the iterator requesting metadata + * @param profile - the new element to be returned by next() + */ + protected void initIteratorMeta( + final BaseIterator it, + final PersistedProfile profile) { + it.clearMeta(); + + it.setMeta("batchUUID", profile.getBatchUUID()); + it.setMeta("batchTimestamp", profile.getBatchTimestamp()); + it.setMeta("resourceSeqNo", profile.getResourceSeqNo()); + it.setMeta("scopeSeqNo", profile.getScopeSeqNo()); + it.setMeta("profileSeqNo", profile.getProfileSeqNo()); + it.setMeta("sampleSeqNo", profile.getSampleSeqNo()); + it.setMeta("valueSeqNo", profile.getValueSeqNo()); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/RawIterator.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/RawIterator.java new file mode 100644 index 0000000..c177921 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/RawIterator.java @@ -0,0 +1,76 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.util.Iterator; + +/** + * A {@link BaseIterator} that wraps around an {@link Iterable} of + * elements of a different type. Converts these elements before + * returning them via {@link #next()}. + * + * @param the type of elements of this iterator + * @param the {@link Iterable} elements type (to be converted) + */ +public abstract class RawIterator extends BaseIterator { + + /** + * An iterator obtained from the wrapped {@link Iterable}. + */ + private Iterator elements; + + /** + * Create a new {@link RawIterator}. + * + * @param wrap the {@link Iterable} to wrap + */ + public RawIterator(final Iterable wrap) { + this.elements = wrap.iterator(); + } + + /** + * Convert an input element to its output type. + * + * Called by {@link #next()} before returning. + * + * @param element the input element + * @return the element to be returned by {@link #next()} + */ + public abstract T convert(I element); + + /** + * {@inheritDoc} + */ + @Override + public boolean hasNext() { + return elements.hasNext(); + } + + /** + * {@inheritDoc} + */ + @Override + public T next() { + I n = elements.next(); + T t = convert(n); + + initMetaFor(t); + + return t; + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java new file mode 100644 index 0000000..d9a2680 --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java @@ -0,0 +1,165 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.druid.format; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; +import org.apache.druid.java.util.common.parsers.ParseException; + +import io.mishmash.opentelemetry.persistence.proto.ProtobufSpans; +import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan; +import io.mishmash.opentelemetry.server.collector.Span; +import io.mishmash.opentelemetry.server.collector.TracesFlattener; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; + +/** + * An {@link IntermediateRowParsingReader} for OpenTelemetry traces signals. + * + * Takes an {@link InputEntity} containing a 'raw' protobuf OTLP + * {@link ExportTraceServiceRequest} and 'flattens' it using + * {@link TracesFlattener}, or takes a single (already flattened) + * {@link PersistedSpan} protobuf message. + */ +public class TracesReader extends IntermediateRowParsingReader { + + /** + * The source to read data from. + */ + private InputEntity source; + /** + * True if the 'raw' format was configured. + */ + private boolean isRaw = false; + + /** + * Create an OTLP logs reader. + * + * @param input the {@link InputEntity} containing protobuf-encoded bytes + * @param isRawFormat true if input contains a 'raw' + * {@link ExportTraceServiceRequest} + */ + public TracesReader( + final InputEntity input, + final boolean isRawFormat) { + this.source = input; + this.isRaw = isRawFormat; + } + + /** + * {@inheritDoc} + */ + @Override + protected List parseInputRows( + final PersistedSpan intermediateRow) + throws IOException, ParseException { + // TODO Auto-generated method stub + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected List> toMap( + final PersistedSpan intermediateRow) throws IOException { + return Collections.singletonList( + ProtobufSpans.toJsonMap(intermediateRow)); + } + + /** + * {@inheritDoc} + */ + @Override + protected CloseableIteratorWithMetadata + intermediateRowIteratorWithMetadata() throws IOException { + try (InputStream is = source().open()) { + if (isRaw) { + ExportTraceServiceRequest req = + ExportTraceServiceRequest.parseFrom(is); + + return new RawIterator<>( + new TracesFlattener(null, null, req, null)) { + @Override + public PersistedSpan convert(final Span element) { + return ProtobufSpans.buildSpan(element).build(); + } + + @Override + public void initMetaFor( + final PersistedSpan element) { + initIteratorMeta(this, element); + } + }; + } else { + PersistedSpan span = PersistedSpan.parseFrom(is); + + return new FlatIterator<>(span) { + @Override + public void initMetaFor( + final PersistedSpan element) { + initIteratorMeta(this, element); + } + }; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + protected String intermediateRowAsString(final PersistedSpan row) { + return row.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + protected InputEntity source() { + return source; + } + + /** + * Initialize the metadata associated with the given trace span. + * + * A {@link CloseableIteratorWithMetadata} has to provide metadata + * for each element it returns through its next() method. This + * metadata is computed here, before next() returns. + * + * @param it - the iterator requesting metadata + * @param span - the new element to be returned by next() + */ + protected void initIteratorMeta( + final BaseIterator it, + final PersistedSpan span) { + it.clearMeta(); + + it.setMeta("batchUUID", span.getBatchUUID()); + it.setMeta("batchTimestamp", span.getBatchTimestamp()); + it.setMeta("seqNo", span.getSeqNo()); + } +} diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/package-info.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/package-info.java new file mode 100644 index 0000000..174202f --- /dev/null +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/package-info.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + *

OpenTelemetry input format for Apache Druid

+ *

+ * Use this input format to ingest OTLP (OpenTelemetry protocol) data + * into Apache Druid. + *

+ *

+ * Combine with an input source (Kafka, etc). + *

+ *

+ * For more information on how to use this input format visit + * @see + * OTLP Input Format for Apache Druid on GitHub. + *

+ * Apache Druid is a high performance, real-time analytics database that + * delivers sub-second queries on streaming and batch data at scale + * and under load. These features make it perfect for OpenTelemetry analytics. + * To find out more about Apache Druid + * @see visit its website. + *

+ * This package is part of a broader set of OpenTelemetry-related activities + * at mishmash io. To find out more about this and other packages visit the + * links below. + *

+ * @see + * OpenTelemetry at mishmash io + * @see + * This package on GitHub + * @see mishmash io + */ +package io.mishmash.opentelemetry.druid.format; diff --git a/druid-input-format/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/druid-input-format/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 0000000..12d64ac --- /dev/null +++ b/druid-input-format/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,15 @@ +# Copyright 2024 Mishmash IO UK Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +io.mishmash.opentelemetry.druid.format.OTLPInputFormatModule diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java index 2b4fa4f..8df8b45 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java @@ -17,6 +17,8 @@ package io.mishmash.opentelemetry.persistence.proto; +import java.util.Map; + import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; import io.mishmash.opentelemetry.server.collector.Log; import io.opentelemetry.proto.common.v1.AnyValue; @@ -125,4 +127,15 @@ public static PersistedLog.Builder buildLog(final Log log) { return builder; } + + /** + * Convert a {@link PersistedLog} to a {@link Map} suitable for JSON + * encoding. + * + * @param log the persisted log protobuf message + * @return the {@link Map} + */ + public static Map toJsonMap(final PersistedLog log) { + return ProtobufUtils.toJsonMap(log.getAllFields()); + } } diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java index d077f06..e7488e8 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java @@ -17,6 +17,8 @@ package io.mishmash.opentelemetry.persistence.proto; +import java.util.Map; + import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; import io.mishmash.opentelemetry.server.collector.MetricDataPoint; import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; @@ -206,4 +208,16 @@ public static PersistedMetric.Builder buildMetricDataPoint( return builder; } + + /** + * Convert a {@link PersistedMetric} to a {@link Map} suitable for JSON + * encoding. + * + * @param metric the persisted metric protobuf message + * @return the {@link Map} + */ + public static Map toJsonMap( + final PersistedMetric metric) { + return ProtobufUtils.toJsonMap(metric.getAllFields()); + } } diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java index 91b31e6..848ffd8 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.KeyValueUnit; import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; @@ -441,4 +442,16 @@ public static List toStr( .map(l -> getStrAt(p, l)) .toList(); } + + /** + * Convert a {@link PersistedProfile} to a {@link Map} suitable for JSON + * encoding. + * + * @param profile the persisted profile protobuf message + * @return the {@link Map} + */ + public static Map toJsonMap( + final PersistedProfile profile) { + return ProtobufUtils.toJsonMap(profile.getAllFields()); + } } diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java index 9278479..f7673ac 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java @@ -17,6 +17,8 @@ package io.mishmash.opentelemetry.persistence.proto; +import java.util.Map; + import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan; import io.mishmash.opentelemetry.server.collector.Span; @@ -105,4 +107,16 @@ public static PersistedSpan.Builder buildSpan(final Span span) { return builder; } + + /** + * Convert a {@link PersistedSpan} to a {@link Map} suitable for JSON + * encoding. + * + * @param span the persisted span protobuf message + * @return the {@link Map} + */ + public static Map toJsonMap( + final PersistedSpan span) { + return ProtobufUtils.toJsonMap(span.getAllFields()); + } } diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java new file mode 100644 index 0000000..c7e281f --- /dev/null +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.proto; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import com.google.protobuf.BoolValue; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.Descriptors.EnumValueDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.DoubleValue; +import com.google.protobuf.FloatValue; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; +import com.google.protobuf.StringValue; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; + +/** + * Various helper methods for general protobuf message handling. + */ +public final class ProtobufUtils { + + /** + * Custom converters for some protobuf message types. + * + * Helps with avoiding unnecessary nesting. + */ + private static Map> converters; + + static { + converters.put( + BoolValue.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + Int32Value.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + UInt32Value.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + Int64Value.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + UInt64Value.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + StringValue.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + BytesValue.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + FloatValue.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + converters.put( + DoubleValue.getDescriptor().getFullName(), + ProtobufUtils::toJsonNestedValue); + + } + + private ProtobufUtils() { + // constructor is hidden + } + + /** + * Convert a {@link Map} of fields into a JSON-friendly map. + * + * Use to convert all fields of a {@link Message}. + * + * @param entries the protobuf fields + * @return a JSON-friendly {@link Map} + */ + public static Map toJsonMap( + final Map entries) { + Map res = new HashMap<>(entries.size()); + + for (Map.Entry ent : entries.entrySet()) { + res.put(ent.getKey().getJsonName(), + toJsonValue(ent.getKey(), ent.getValue())); + } + + return res; + } + + private static Object toJsonValue( + final FieldDescriptor field, + final Object value) { + if (value == null) { + return null; + } + + if (value instanceof Message) { + Message msg = (Message) value; + Function converter = + converters.get(msg.getDescriptorForType().getFullName()); + + if (converter != null) { + return converter.apply(msg); + } + } + + if (field.isMapField()) { + FieldDescriptor k = field + .getMessageType() + .findFieldByName("key"); + FieldDescriptor v = field + .getMessageType() + .findFieldByName("value"); + + if (k == null || v == null) { + throw new IllegalArgumentException( + String.format("Malformed protobuf field '%s'", + field.getFullName())); + } + + List elementsList = (List) value; + Map resMap = new HashMap<>(elementsList.size()); + + for (Object element : elementsList) { + resMap.put( + (String) toJsonValueSingle( + k, + ((Message) element).getField(k)), + toJsonValueSingle( + v, + ((Message) element).getField(v))); + } + + return resMap; + } else if (field.isRepeated()) { + List valuesList = (List) value; + List resList = new ArrayList<>(valuesList.size()); + + for (Object o : valuesList) { + resList.add(toJsonValueSingle(field, o)); + } + + return resList; + } else { + return toJsonValueSingle(field, value); + } + } + + private static Object toJsonValueSingle( + final FieldDescriptor field, + final Object value) { + switch (field.getType()) { + case BYTES: + return ((ByteString) value).toByteArray(); + case ENUM: + if ("google.protobuf.NullValue" + .equals(field.getEnumType().getFullName())) { + return null; + } + + return ((EnumValueDescriptor) value).getName(); + case GROUP: + case MESSAGE: + if (value == null) { + return null; + } + + return toJsonMap(((Message) value).getAllFields()); + default: + return value; + } + } + + private static Object toJsonNestedValue(final Message m) { + FieldDescriptor v = m.getDescriptorForType().findFieldByName("value"); + + if (v == null) { + throw new IllegalArgumentException( + String.format("Malformed protobuf message '%s'", + m.getDescriptorForType().getFullName())); + } + + return toJsonValueSingle(v, m.getField(v)); + } +} diff --git a/pom.xml b/pom.xml index 6a09a79..781c81f 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ collector-embedded persistence-protobuf server-parquet + druid-input-format