From fd676b5863d8275aa261c1cbc2947da3bbcb2291 Mon Sep 17 00:00:00 2001 From: zzuljin Date: Tue, 15 Oct 2024 13:36:57 +0300 Subject: [PATCH 1/2] Improve druid input format usability --- .../collector/MetricsFlattenerTests.java | 4 - .../druid/format/LogsReader.java | 42 +++- .../druid/format/MetricsReader.java | 238 +++++++++++++++++- .../druid/format/ProfilesReader.java | 74 +++++- .../druid/format/TracesReader.java | 42 +++- .../persistence/protobuf/ProtobufUtils.java | 9 +- 6 files changed, 394 insertions(+), 15 deletions(-) diff --git a/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/MetricsFlattenerTests.java b/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/MetricsFlattenerTests.java index fbaa284..f4cb0a8 100644 --- a/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/MetricsFlattenerTests.java +++ b/collector-embedded/src/test/java/io/mishmash/opentelemetry/server/collector/MetricsFlattenerTests.java @@ -26,12 +26,8 @@ import org.junit.jupiter.api.Test; -import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.ResourceLogs; -import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.proto.metrics.v1.Gauge; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; diff --git a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java index 3a434b1..40b212f 100644 --- a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java +++ b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -31,6 +33,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; +import com.google.protobuf.Descriptors.FieldDescriptor; + import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; import io.mishmash.opentelemetry.persistence.protobuf.ProtobufLogs; import io.mishmash.opentelemetry.server.collector.Log; @@ -59,6 +63,10 @@ public class LogsReader extends IntermediateRowParsingReader { * The ingestion schema config. */ private InputRowSchema schema; + /** + * Keeps a reference to all possible dimensions. + */ + private static Set logDimensions; /** * Create an OTLP logs reader. @@ -86,7 +94,11 @@ protected List parseInputRows( throws IOException, ParseException { return Collections.singletonList( MapInputRowParser.parse( - schema, + schema.getTimestampSpec(), + MapInputRowParser.findDimensions( + schema.getTimestampSpec(), + schema.getDimensionsSpec(), + allDimensions()), ProtobufLogs.toJsonMap(intermediateRow, false))); } @@ -177,4 +189,32 @@ protected void initIteratorMeta( it.setMeta("batchTimestamp", log.getBatchTimestamp()); it.setMeta("seqNo", log.getSeqNo()); } + + /** + * Return all possible dimensions names. + * + * This method is used to compute a full list of dimension names + * (including optional values that might be missing) when supplying rows. + * + * @return a {@link Set} of all possible dimension names + */ + protected Set allDimensions() { + if (logDimensions == null) { + logDimensions = new HashSet<>(); + + for (FieldDescriptor field : PersistedLog + .getDescriptor() + .getFields()) { + /* + * Exclude dimensions that might have been reconfigured + * as metrics instead. + */ + if (!schema.getMetricNames().contains(field.getName())) { + logDimensions.add(field.getName()); + } + } + } + + return logDimensions; + } } diff --git a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java index cfddb67..78da77b 100644 --- a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java +++ b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java @@ -19,9 +19,13 @@ import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -31,11 +35,14 @@ import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; +import com.google.protobuf.Descriptors.FieldDescriptor; + import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; import io.mishmash.opentelemetry.persistence.protobuf.ProtobufMetrics; import io.mishmash.opentelemetry.server.collector.MetricDataPoint; import io.mishmash.opentelemetry.server.collector.MetricsFlattener; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint.Buckets; /** * An {@link IntermediateRowParsingReader} for OpenTelemetry metrics signals. @@ -48,6 +55,38 @@ public class MetricsReader extends IntermediateRowParsingReader { + /** + * The name of the column for the 'extracted' histogram values. + */ + private static final String COL_EXTRACTED_HISTOGRAM = "histogram"; + /** + * The name of the column for the 'extracted' exponential histogram values. + */ + private static final String COL_EXTRACTED_EXPONENTIAL_HISTOGRAM = + "exponential_histogram"; + /** + * The list of default candidates for metrics columns. + */ + private static final String[] DEFAULT_METRIC_NAMES = new String[] { + "gauge_double", + "gauge_int", + "sum_double", + "sum_int", + "histogram_count", + "histogram_sum", + "histogram_min", + "histogram_max", + "exponential_histogram_count", + "exponential_histogram_sum", + "exponential_histogram_scale", + "exponential_histogram_zero_count", + "exponential_histogram_min", + "exponential_histogram_max", + "exponential_histogram_zero_threshold", + "summary_count", + "summary_sum" + }; + /** * The source to read data from. */ @@ -60,6 +99,14 @@ public class MetricsReader * The ingestion schema config. */ private InputRowSchema schema; + /** + * Keeps a reference to all possible dimension column names. + */ + private Set metricsDimensions; + /** + * Keeps a reference to all possible metric column names. + */ + private Set metricsMetrics; /** * Create an OTLP metrics reader. @@ -76,6 +123,29 @@ public MetricsReader( this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; + + metricsMetrics = new HashSet<>(); + + /* + * Before we configure the defaults for metrics - + * make sure a column is not already considered a + * dimension instead. + */ + Set configuredDimensions = new HashSet<>(); + configuredDimensions.addAll(rowSchema + .getDimensionsSpec() + .getDimensionNames()); + configuredDimensions.addAll(rowSchema + .getDimensionsSpec() + .getDimensionExclusions()); + + for (int i = 0; i < DEFAULT_METRIC_NAMES.length; i++) { + String metricName = DEFAULT_METRIC_NAMES[i]; + + if (!configuredDimensions.contains(metricName)) { + metricsMetrics.add(metricName); + } + } } /** @@ -85,10 +155,19 @@ public MetricsReader( protected List parseInputRows( final PersistedMetric intermediateRow) throws IOException, ParseException { + Map rowMap = ProtobufMetrics + .toJsonMap(intermediateRow, false); + + rowMap.putAll(extractBuckets(intermediateRow, false)); + return Collections.singletonList( MapInputRowParser.parse( - schema, - ProtobufMetrics.toJsonMap(intermediateRow, false))); + schema.getTimestampSpec(), + MapInputRowParser.findDimensions( + schema.getTimestampSpec(), + schema.getDimensionsSpec(), + allDimensions()), + rowMap)); } /** @@ -101,8 +180,12 @@ protected List> toMap( * This is called when Druid is sampling data, so, provide defaults * for missing fields. */ - return Collections.singletonList( - ProtobufMetrics.toJsonMap(intermediateRow, true)); + Map res = ProtobufMetrics + .toJsonMap(intermediateRow, true); + + res.putAll(extractBuckets(intermediateRow, true)); + + return Collections.singletonList(res); } /** @@ -181,4 +264,151 @@ protected void initIteratorMeta( it.setMeta("metricSeqNo", metric.getSeqNo()); it.setMeta("dataPointSeqNo", metric.getDatapointSeqNo()); } + + /** + * Return all possible dimensions names. + * + * This method is used to compute a full list of dimension names + * (including optional values that might be missing) when supplying rows. + * + * @return a {@link Set} of all possible dimension names + */ + protected Set allDimensions() { + if (metricsDimensions == null) { + metricsDimensions = new HashSet<>(); + + for (FieldDescriptor field : PersistedMetric + .getDescriptor() + .getFields()) { + String fieldName = field.getName(); + if (!metricsMetrics.contains(fieldName) + && !schema.getMetricNames().contains(fieldName)) { + metricsDimensions.add(fieldName); + } + } + + if (!metricsMetrics.contains(COL_EXTRACTED_HISTOGRAM) + && !schema + .getMetricNames() + .contains(COL_EXTRACTED_HISTOGRAM)) { + metricsDimensions.add(COL_EXTRACTED_HISTOGRAM); + } + + if (!metricsMetrics.contains(COL_EXTRACTED_EXPONENTIAL_HISTOGRAM) + && !schema + .getMetricNames() + .contains(COL_EXTRACTED_EXPONENTIAL_HISTOGRAM)) { + metricsDimensions.add(COL_EXTRACTED_EXPONENTIAL_HISTOGRAM); + } + } + + return metricsDimensions; + } + + /** + * Extract some metrics types into more useful columns. + * + * For example - this method extracts the bucket counts of Histogram + * metrics into a single 'array' column, where elements contain the + * bucket bounds and the count. + * + * @param metric the metric being processed + * @param withDefaults if defaults for missing values should be added + * @return a {@link Map} with the new, extracted columns (or defaults) + */ + protected Map extractBuckets( + final PersistedMetric metric, + final boolean withDefaults) { + Map res = new LinkedHashMap<>(); + + if (withDefaults) { + res.put(COL_EXTRACTED_HISTOGRAM, + Collections.emptyList()); + res.put(COL_EXTRACTED_EXPONENTIAL_HISTOGRAM, + Collections.emptyList()); + } + + switch (metric.getType()) { + case "HISTOGRAM": + if (metric.getHistogramBucketCountsCount() + != metric.getHistogramExplicitBoundsCount() + 1) { + throw new IllegalArgumentException( + "Illegal metric bucket array sizes"); + } + + Double prevBound = null; + List> histogram = + new ArrayList<>(metric.getHistogramBucketCountsCount()); + Map bucket; + + for (int i = 0; + i < metric.getHistogramExplicitBoundsCount(); + i++) { + bucket = new LinkedHashMap<>(); + + bucket.put("lower_bound", prevBound); + bucket.put("upper_bound", + prevBound = metric.getHistogramExplicitBounds(i)); + bucket.put("count", metric.getHistogramBucketCounts(i)); + + histogram.add(bucket); + } + + bucket = new LinkedHashMap<>(); + + bucket.put("lower_bound", prevBound); + bucket.put("upper_bound", null); + bucket.put("count", + metric.getHistogramBucketCounts( + metric.getHistogramBucketCountsCount() - 1)); + + histogram.add(bucket); + + res.put(COL_EXTRACTED_HISTOGRAM, histogram); + + break; + + case "EXPONENTIAL_HISTOGRAM": + double base = Math.pow( + 2, + Math.pow(2, + -metric.getExponentialHistogramScale())); + + Buckets negative = metric.getExponentialHistogramNegative(); + Buckets positive = metric.getExponentialHistogramPositive(); + int negativeOffset = negative.getOffset(); + int positiveOffset = positive.getOffset(); + List> exp = new ArrayList<>( + negative.getBucketCountsCount() + + positive.getBucketCountsCount()); + + for (int i = 0; i < negative.getBucketCountsCount(); i++) { + Map eb = new LinkedHashMap<>(); + + eb.put("lower_bound", -Math.pow(base, negativeOffset + i)); + eb.put("upper_bound", -Math.pow(base, negativeOffset + i + 1)); + eb.put("count", negative.getBucketCounts(i)); + + exp.add(eb); + } + + for (int i = 0; i < positive.getBucketCountsCount(); i++) { + Map eb = new LinkedHashMap<>(); + + eb.put("lower_bound", Math.pow(base, positiveOffset + i)); + eb.put("upper_bound", Math.pow(base, positiveOffset + i + 1)); + eb.put("count", positive.getBucketCounts(i)); + + exp.add(eb); + } + + res.put(COL_EXTRACTED_EXPONENTIAL_HISTOGRAM, exp); + + break; + + default: + } + + return res; + } } diff --git a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java index d2cb7e5..915f665 100644 --- a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java +++ b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -31,6 +33,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; +import com.google.protobuf.Descriptors.FieldDescriptor; + import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; import io.mishmash.opentelemetry.persistence.protobuf.ProtobufProfiles; import io.mishmash.opentelemetry.server.collector.ProfileSampleValue; @@ -48,6 +52,15 @@ public class ProfilesReader extends IntermediateRowParsingReader { + /** + * The list of default candidates for metrics columns. + */ + private static final String[] DEFAULT_METRIC_NAMES = new String[] { + "duration_nanos", + "period", + "value" + }; + /** * The source to read data from. */ @@ -60,6 +73,14 @@ public class ProfilesReader * The ingestion schema config. */ private InputRowSchema schema; + /** + * Keeps a reference to all possible dimensions. + */ + private Set profilesDimensions; + /** + * Keeps a reference to all possible metrics. + */ + private Set profilesMetrics; /** * Create an OTLP profiles reader. @@ -76,6 +97,29 @@ public ProfilesReader( this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; + + profilesMetrics = new HashSet<>(); + + /* + * Before we configure the defaults for metrics - + * make sure a column is not already considered a + * dimension instead. + */ + Set configuredDimensions = new HashSet<>(); + configuredDimensions.addAll(rowSchema + .getDimensionsSpec() + .getDimensionNames()); + configuredDimensions.addAll(rowSchema + .getDimensionsSpec() + .getDimensionExclusions()); + + for (int i = 0; i < DEFAULT_METRIC_NAMES.length; i++) { + String metricName = DEFAULT_METRIC_NAMES[i]; + + if (!configuredDimensions.contains(metricName)) { + profilesMetrics.add(metricName); + } + } } /** @@ -87,7 +131,11 @@ protected List parseInputRows( throws IOException, ParseException { return Collections.singletonList( MapInputRowParser.parse( - schema, + schema.getTimestampSpec(), + MapInputRowParser.findDimensions( + schema.getTimestampSpec(), + schema.getDimensionsSpec(), + allDimensions()), ProtobufProfiles.toJsonMap(intermediateRow, false))); } @@ -184,4 +232,28 @@ protected void initIteratorMeta( it.setMeta("sampleSeqNo", profile.getSampleSeqNo()); it.setMeta("valueSeqNo", profile.getValueSeqNo()); } + + /** + * Return all possible dimensions names. + * + * This method is used to compute a full list of dimension names + * (including optional values that might be missing) when supplying rows. + * + * @return a {@link Set} of all possible dimension names + */ + protected Set allDimensions() { + if (profilesDimensions == null) { + profilesDimensions = new HashSet<>(); + + for (FieldDescriptor field : PersistedProfile + .getDescriptor() + .getFields()) { + if (!profilesMetrics.contains(field.getName())) { + profilesDimensions.add(field.getName()); + } + } + } + + return profilesDimensions; + } } diff --git a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java index 445e236..e3df269 100644 --- a/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java +++ b/druid-otlp-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -31,6 +33,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; +import com.google.protobuf.Descriptors.FieldDescriptor; + import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan; import io.mishmash.opentelemetry.persistence.protobuf.ProtobufSpans; import io.mishmash.opentelemetry.server.collector.Span; @@ -59,6 +63,10 @@ public class TracesReader extends IntermediateRowParsingReader { * The ingestion schema config. */ private InputRowSchema schema; + /** + * Keeps a reference to all possible dimensions. + */ + private Set tracesDimensions; /** * Create an OTLP traces reader. @@ -86,7 +94,11 @@ protected List parseInputRows( throws IOException, ParseException { return Collections.singletonList( MapInputRowParser.parse( - schema, + schema.getTimestampSpec(), + MapInputRowParser.findDimensions( + schema.getTimestampSpec(), + schema.getDimensionsSpec(), + allDimensions()), ProtobufSpans.toJsonMap(intermediateRow, false))); } @@ -177,4 +189,32 @@ protected void initIteratorMeta( it.setMeta("batchTimestamp", span.getBatchTimestamp()); it.setMeta("seqNo", span.getSeqNo()); } + + /** + * Return all possible dimensions names. + * + * This method is used to compute a full list of dimension names + * (including optional values that might be missing) when supplying rows. + * + * @return a {@link Set} of all possible dimension names + */ + protected Set allDimensions() { + if (tracesDimensions == null) { + tracesDimensions = new HashSet<>(); + + for (FieldDescriptor field : PersistedSpan + .getDescriptor() + .getFields()) { + /* + * Exclude dimensions that might have been reconfigured + * as metrics instead. + */ + if (!schema.getMetricNames().contains(field.getName())) { + tracesDimensions.add(field.getName()); + } + } + } + + return tracesDimensions; + } } diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java index de964ae..1aee27e 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import com.google.protobuf.ByteString; @@ -76,7 +76,7 @@ private ProtobufUtils() { public static Map toJsonMap( final Collection> entries, final boolean withDefaults) { - Map res = new HashMap<>(entries.size()); + Map res = new LinkedHashMap<>(entries.size()); for (Map.Entry ent : entries) { res.put(ent.getKey().getName(), @@ -115,7 +115,8 @@ private static Object toJsonValue( } List elementsList = (List) value; - Map resMap = new HashMap<>(elementsList.size()); + Map resMap = + new LinkedHashMap<>(elementsList.size()); for (Object element : elementsList) { resMap.put( @@ -160,7 +161,7 @@ private static Object toJsonValue( return Collections.emptyList(); } - Map res = new HashMap<>(kvs.size()); + Map res = new LinkedHashMap<>(kvs.size()); for (KeyValue kv : kvs) { res.put( From 96af17d7c76f34f7a947e18fde9112d1298191d5 Mon Sep 17 00:00:00 2001 From: Andrey Rusev Date: Tue, 15 Oct 2024 16:30:34 +0300 Subject: [PATCH 2/2] Write druid-otlp-format README --- README.md | 2 +- druid-otlp-format/README.md | 298 ++++++++++++++++++++++++++++++- examples/notebooks/basics.ipynb | 24 +-- examples/notebooks/metrics.ipynb | 12 +- 4 files changed, 315 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 2d01af6..878a478 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ If the above sounds convincing - keep reading through this document and explore # OpenTelemetry for Developers, Data Engineers and Data Scientists -We have prepared a few Jupyter notebooks that visually explore OpenTelemetry data that we collected from [a demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) +We have prepared a few Jupyter notebooks that visually explore OpenTelemetry data that we collected from [a demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demos) using the [Apache Parquet Stand-alone server](./server-parquet) contained in this repository. If you are the sort of person who prefers to learn by looking at **actual data** - start with the [OpenTelemetry Basics Notebook.](./examples/notebooks/basics.ipynb) diff --git a/druid-otlp-format/README.md b/druid-otlp-format/README.md index e7f2486..ae46e78 100644 --- a/druid-otlp-format/README.md +++ b/druid-otlp-format/README.md @@ -1,7 +1,301 @@ -# Apache Druid Input Format for OpenTelemetry singals +# Apache Druid extension for OpenTelemetry singals ingestion -Coming soon! +This artifact implements an Apache Druid extension that you can use to ingest +[OpenTelemetry](https://opentelemetry.io) signals - `logs`, `metrics`, `traces` and `profiles` - into Apache Druid, and then query Druid through interactive charts and dashboards. + +## What is OpenTelemetry? + +OpenTelemetry is high-quality, ubiquitous, and portable telemetry that enables effective observability. + +It's a newer generation of telemetry systems that builds on the experience gained with earlier +implementations and offers a number of improvements. We, at [mishmash io,](https://mishmash.io) find +OpenTelemetry particularly useful - see +[the reasons here](../README.md#why-you-should-switch-to-opentelemetry), and +[how we use it in our development process here.](https://mishmash.io/open_source/opentelemetry) + +Also, make sure you check [the OpenTelemetry official docs.](https://opentelemetry.io/docs/) + +## Why Apache Druid? + +Apache Druid is a high performance, real-time analytics database that delivers sub-second queries on streaming and batch data at scale and under load. It performs particularly well on `timestamped` +data, and OpenTelemetry signals are heavily timestamped. See [Apache Druid Use Cases](https://druid.apache.org/use-cases) for more. + +# Quick introduction + +To get an idea of why and when to use this Druid extension - here is an example setup: + +1. Setup [Apache Kafka.](https://kafka.apache.org/) +2. Get the [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) and configure it + to export data to Kafka. Use the [OpenTelemetry Kafka Exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/kafkaexporter/README.md) provided by OpenTelemetry. +3. Setup Apache Druid with this artifact as an extension. +4. Inside Druid, configure [Kafka Streaming Ingestions](https://druid.apache.org/docs/latest/ingestion/streaming/) for `logs`, `metrics` and `traces`. + + In the ingestion specs set the `InputFormat` to this extension (more on this below). + + ***Note:*** OpenTelemetry `profiles` signal is still in development and is not supported + by the Kafka Exporter. +5. Setup [Apache Superset](https://superset.apache.org/) with a [Druid database driver.](https://superset.apache.org/docs/configuration/databases#apache-druid) +6. Explore your telemetry in Superset! + +> [!TIP] +> We have prepared a clone of [OpenTelemetry's demo app](https://opentelemetry.io/docs/demo/) with +> the exact same setup as above. +> +> It's quicker to run (no configuration needed) and will also generate telemetry to populate your +> Druid tables. +> +> Get our [Druid ingestion demo app fork here.](https://github.com/mishmash-io/opentelemetry-demos) + +> [!NOTE] +> There are more ways of ingesting OpenTelemetry data into Apache Druid. +> +> Watch [this repository](https://github.com/mishmash-io/opentelemetry-server-embedded) for updates +> as we continue to publish our internal OpenTelemetry-related code and add more examples! + +# Installation + +In order to use this extension you need to install it inside your Druid nodes. This is done +through the typical [extension-loading process](https://druid.apache.org/docs/latest/configuration/extensions/): + +1. Pull the extension: + ```bash + cd /opt/druid && \ + bin/run-java \ + -classpath "lib/*" org.apache.druid.cli.Main tools pull-deps \ + --no-default-hadoop -c "io.mishmash.opentelemetry:druid-otlp-format:1.1.3" + ``` +2. Enable it in the Druid configuration: + ``` + druid.extensions.loadList=[, "druid-otlp-format"] + ``` + +That's all! Now launch your Druid instances and you're ready to ingest OpenTelemetry data! + +## Using a Docker image + +We've prepared an [Apache Druid for OpenTelemetry Docker image](https://hub.docker.com/repository/docker/mishmashio/druid-for-opentelemetry) that lets you skip the installation steps above and just +launch a Druid cluster with our OpenTelemetry extenion preinstalled. + +It is based on the [official image](https://druid.apache.org/docs/latest/tutorials/docker/) and is +configured and launched the same way. Just replace the tags with `mishmashio/druid-for-opentelemetry` in your `docker-compose.yaml.` + +Don't forget to enable the extension when launching! + +## Building your own Docker image + +If you're building your own, custom Druid image - just include the `pull-deps` installation step +above. + +***Note:*** The `pull-deps` command needs to open a number of files simultaneously, and the maximum +number of open files inside a container might be limited on your system. On some Linux distributions +`podman`, for example, has limits that are too low. + +If this turns out to be the case on your computer, raise the `number of open files` limit during the +build: ```sh podman build --ulimit nofile=65535:65535 -f Dockerfile .. ``` + +# Configuring ingestion jobs + +Once the `druid-otlp-format` extension is loaded, you can set Druid to start loading OpenTelemetry +data. + +To create a data import task you need to provide Druid with an `ingestion spec`, detailing, among +other things - where it should look for data, what the data format is, etc. + +To apply this extension to your data source - write an `ingestion spec` the way you normally would, +just set the ***InputFormat*** section to: + +- when ingesting `logs` signals: + - when produced by the **OpenTelemetry Kafka Exporter:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "logsRaw" + }, + ... + }, + ... + } + ... + } + ``` + - when produced by other modules published by **mishmash io:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "logsFlat" + }, + ... + }, + ... + } + ... + } + ``` +- when ingesting `metrics` signals: + - when produced by the **OpenTelemetry Kafka Exporter:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "metricsRaw" + }, + ... + }, + ... + } + ... + } + ``` + - when produced by other modules published by **mishmash io:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "metricsFlat" + }, + ... + }, + ... + } + ... + } + ``` +- when ingesting `traces` signals: + - when produced by the **OpenTelemetry Kafka Exporter:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "tracesRaw" + }, + ... + }, + ... + } + ... + } + ``` + - when produced by other modules published by **mishmash io:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "tracesFlat" + }, + ... + }, + ... + } + ... + } + ``` +- when ingesting `profiles` signals: + - when produced by the **OpenTelemetry Kafka Exporter:** + + > `Profiles` are not yet supported by the Kafka Exporter + - when produced by other modules published by **mishmash io:** + ```json + { + ... + "spec": { + "ioConfig": { + ... + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "profilesFlat" + }, + ... + }, + ... + } + ... + } + ``` + +An example for a `logs` ingestion spec loading data produced by the OpenTelemetry Kafka Exporter might look like this: + +```json +{ + "type": "kafka", + "spec": { + "ioConfig": { + "type": "kafka", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "topic": "otlp_logs", + "inputFormat": { + "type": "otlp", + "otlpInputSignal": "logsRaw" + }, + "useEarliestOffset": true + }, + "tuningConfig": { + "type": "kafka" + }, + "dataSchema": { + ... + }, + "granularitySpec": { + ... + } + } + } +} +``` + +If you're not very familiar with `ingestion specs` - take a look at [Apache Druid Kafka tutorial](https://druid.apache.org/docs/latest/tutorials/tutorial-kafka) to get started. + +## Using Druid GUI console + +Druid's GUI console provides an interactive way for you to configure your ingestion jobs, without +actually writing JSON specs. + +Unfortunately, at the moment we do not support configuring the OTLP Input Format extension through +the GUI console. + +If you feel a bit brave - you can still use the GUI, with a little 'hack' - when you get to the +`Parse data` step on the GUI wizard - switch to the `Edit Spec` step where you'll see the JSON +that is being prepared. It has the same format as above and you can paste the correct `inputFormat` +parameters (take the correct config from above). Once you do that - switch back to the `Parse data` +tab and voila! + +# OpenTelemetry at mishmash io + +OpenTelemetry's main intent is the observability of production environments, but at [mishmash io](https://mishmash.io) it is part of our software development process. By saving telemetry from **experiments** and **tests** of +our own algorithms we ensure things like **performance** and **resource usage** of our distributed database, continuously and across releases. + +We believe that adopting OpenTelemetry as a software development tool might be useful to you too, which is why we decided to open-source the tools we've built. + +Learn more about the broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at +[mishmash io](https://mishmash.io/) and `follow` [GitHub profile](https://github.com/mishmash-io) for updates and new releases. diff --git a/examples/notebooks/basics.ipynb b/examples/notebooks/basics.ipynb index 5128f06..a99ac5e 100644 --- a/examples/notebooks/basics.ipynb +++ b/examples/notebooks/basics.ipynb @@ -19,7 +19,7 @@ "> Welcome to our quick guide to [OpenTelemetry](https://opentelemetry.io/) for Developers, Data Engineers, Data Scientists and just about everyone who prefers to learn by looking at actual data.\n", "> \n", "> In the following sections we'll introduce the various OpenTelemetry signals - `logs`, `metrics`, `traces` and the upcoming `profiles` as well as additional concepts like `resources` and `scopes`.\n", - "> Samples shown are based on actual OpenTelemetry data emitted by a [demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) and collected by [an OTLP Parquet collector](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet) developed by [mishmash.io](https://mishmash.io/).\n", + "> Samples shown are based on actual OpenTelemetry data emitted by a [demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demos) and collected by [an OTLP Parquet collector](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet) developed by [mishmash.io](https://mishmash.io/).\n", ">\n", "> At [mishmash.io](https://mishmash.io/) we use OpenTelemetry as a development tool to help us:\n", "> - develop faster and more efficient distributed database\n", @@ -43,7 +43,7 @@ "> Welcome to our quick guide to [OpenTelemetry](https://opentelemetry.io/) for Developers, Data Engineers, Data Scientists and just about everyone who prefers to learn by looking at actual data.\n", "> \n", "> In the following sections we'll introduce the various OpenTelemetry signals - `logs`, `metrics`, `traces` and the upcoming `profiles` as well as additional concepts like `resources` and `scopes`.\n", - "> Samples shown are based on actual OpenTelemetry data emitted by a [demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) and collected by [an OTLP Parquet collector](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet) developed by [mishmash.io](https://mishmash.io/).\n", + "> Samples shown are based on actual OpenTelemetry data emitted by a [demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demos) and collected by [an OTLP Parquet collector](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet) developed by [mishmash.io](https://mishmash.io/).\n", ">\n", "> At [mishmash.io](https://mishmash.io/) we use OpenTelemetry as a development tool to help us:\n", "> - develop faster and more efficient distributed database\n", @@ -6613,7 +6613,7 @@ "\n", "#### Your code can attach custom attributes\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) adds details about a product id, quantity and a user id: \n" + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) adds details about a product id, quantity and a user id: \n" ], "text/plain": [ "" @@ -6628,7 +6628,7 @@ "\n", "#### Your code can attach custom attributes\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) adds details about a product id, quantity and a user id: " + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) adds details about a product id, quantity and a user id: " ] }, { @@ -9823,7 +9823,7 @@ "separate instances working together on the same request.\n", "\n", "In the exapmle below, it's a user's shopping cart request, that required various operations like currency convertion, a price quote and fraud analysis.\n", - "As the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) microservices were calling each other to fulfill the business logic behind the user's request - each one of them 'attached' the same `trace_id`\n", + "As the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) microservices were calling each other to fulfill the business logic behind the user's request - each one of them 'attached' the same `trace_id`\n", "to its `logs`. This mechanism is known as [Context Propagation.](https://opentelemetry.io/docs/concepts/context-propagation/)\n", "\n", "Thanks to it, we can now see all log messages related to the same user's request in one single view, despite that they were emitted from completely different \n", @@ -9849,7 +9849,7 @@ "separate instances working together on the same request.\n", "\n", "In the exapmle below, it's a user's shopping cart request, that required various operations like currency convertion, a price quote and fraud analysis.\n", - "As the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) microservices were calling each other to fulfill the business logic behind the user's request - each one of them 'attached' the same `trace_id`\n", + "As the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) microservices were calling each other to fulfill the business logic behind the user's request - each one of them 'attached' the same `trace_id`\n", "to its `logs`. This mechanism is known as [Context Propagation.](https://opentelemetry.io/docs/concepts/context-propagation/)\n", "\n", "Thanks to it, we can now see all log messages related to the same user's request in one single view, despite that they were emitted from completely different \n", @@ -21721,7 +21721,7 @@ "\n", "Through OpenTelemetry instrumentation APIs your code can emit `metrics` specific to your app.\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet)'s currency convertion microservice measures how much of a given currency it has converted:\n" + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos)'s currency convertion microservice measures how much of a given currency it has converted:\n" ], "text/plain": [ "" @@ -21738,7 +21738,7 @@ "\n", "Through OpenTelemetry instrumentation APIs your code can emit `metrics` specific to your app.\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet)'s currency convertion microservice measures how much of a given currency it has converted:" + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos)'s currency convertion microservice measures how much of a given currency it has converted:" ] }, { @@ -40786,7 +40786,7 @@ "\n", "# Exploring traces\n", "\n", - "Let's find the same `trace_id` that we used in the `logs` correlation example above and take a deeper look at what the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) was doing:\n", + "Let's find the same `trace_id` that we used in the `logs` correlation example above and take a deeper look at what the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) was doing:\n", "\n", "> ***Note:*** Look at the `parent_span_id` values and the rows they point to to see how spans get subdivided\n" ], @@ -40803,7 +40803,7 @@ "\n", "# Exploring traces\n", "\n", - "Let's find the same `trace_id` that we used in the `logs` correlation example above and take a deeper look at what the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) was doing:\n", + "Let's find the same `trace_id` that we used in the `logs` correlation example above and take a deeper look at what the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) was doing:\n", "\n", "> ***Note:*** Look at the `parent_span_id` values and the rows they point to to see how spans get subdivided" ] @@ -46166,7 +46166,7 @@ "> Examples here will almost certainly change as development progresses.\n", ">\n", "> To collect the data for the following sections we used an early version of [OpenTelemetry eBPF Profiler,](https://github.com/open-telemetry/opentelemetry-ebpf-profiler) running it separately.\n", - "> This means that the DataFrames below contain data from the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) (same demo webshop as in the examples of other signals above) and a number of other processes running on the same host.\n", + "> This means that the DataFrames below contain data from the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) (same demo webshop as in the examples of other signals above) and a number of other processes running on the same host.\n", "\n", "The last signal type in this demo is the `profiles` signal and its a new extension to OpenTelemetry. The intent of performance profiling is to help you analyze the performance of your app and find code \n", "sections to improve.\n", @@ -46216,7 +46216,7 @@ "> Examples here will almost certainly change as development progresses.\n", ">\n", "> To collect the data for the following sections we used an early version of [OpenTelemetry eBPF Profiler,](https://github.com/open-telemetry/opentelemetry-ebpf-profiler) running it separately.\n", - "> This means that the DataFrames below contain data from the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) (same demo webshop as in the examples of other signals above) and a number of other processes running on the same host.\n", + "> This means that the DataFrames below contain data from the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) (same demo webshop as in the examples of other signals above) and a number of other processes running on the same host.\n", "\n", "The last signal type in this demo is the `profiles` signal and its a new extension to OpenTelemetry. The intent of performance profiling is to help you analyze the performance of your app and find code \n", "sections to improve.\n", diff --git a/examples/notebooks/metrics.ipynb b/examples/notebooks/metrics.ipynb index 13f5d12..b3dce06 100644 --- a/examples/notebooks/metrics.ipynb +++ b/examples/notebooks/metrics.ipynb @@ -20,7 +20,7 @@ ">\n", "> It is strongly recommended that you first go through the [basics of OpenTelemetry data.](basics.ipynb)\n", "\n", - "In this demo notebook we'll further process OpenTelemetry `metrics` data that was emitted by a [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) and recorded into Apache Parquet files by \n", + "In this demo notebook we'll further process OpenTelemetry `metrics` data that was emitted by a [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) and recorded into Apache Parquet files by \n", "the [OTLP Parquet Server](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet).\n", "\n", "We'll be using data that is not in the ***raw*** OpenTelemetry format. [The basics notebook](basics.ipynb) does some preprocessing on the raw data\n", @@ -46,7 +46,7 @@ ">\n", "> It is strongly recommended that you first go through the [basics of OpenTelemetry data.](basics.ipynb)\n", "\n", - "In this demo notebook we'll further process OpenTelemetry `metrics` data that was emitted by a [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) and recorded into Apache Parquet files by \n", + "In this demo notebook we'll further process OpenTelemetry `metrics` data that was emitted by a [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) and recorded into Apache Parquet files by \n", "the [OTLP Parquet Server](https://github.com/mishmash-io/opentelemetry-server-embedded/tree/main/server-parquet).\n", "\n", "We'll be using data that is not in the ***raw*** OpenTelemetry format. [The basics notebook](basics.ipynb) does some preprocessing on the raw data\n", @@ -7188,7 +7188,7 @@ "\n", "#### Emitters of java memory metrics\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) used to obtain our data set has a few Java services:\n" + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) used to obtain our data set has a few Java services:\n" ], "text/plain": [ "" @@ -7203,7 +7203,7 @@ "\n", "#### Emitters of java memory metrics\n", "\n", - "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) used to obtain our data set has a few Java services:" + "The [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) used to obtain our data set has a few Java services:" ] }, { @@ -10472,7 +10472,7 @@ "- ***I'm measuring the quality of service delivered to my users?***\n", "\n", " In the `HISTOGRAM` data points example above we used a metric of how long it takes to handle an incoming HTTP request. For a web app (such as \n", - " the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) used to capture this metrics data set) this is an important metric that directly observes the quality users get. Or in other \n", + " the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) used to capture this metrics data set) this is an important metric that directly observes the quality users get. Or in other \n", " words, it's a metric that captures an aspect of the final **output** if your app, an aspect of what it **delivers.**\n", "\n", " In these situations you might want to use a `HISTOGRAM`, because of the greater detail that it offers. It is a safer choice - in your initial \n", @@ -10562,7 +10562,7 @@ "- ***I'm measuring the quality of service delivered to my users?***\n", "\n", " In the `HISTOGRAM` data points example above we used a metric of how long it takes to handle an incoming HTTP request. For a web app (such as \n", - " the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demo-to-parquet) used to capture this metrics data set) this is an important metric that directly observes the quality users get. Or in other \n", + " the [demo Astronomy webshop](https://github.com/mishmash-io/opentelemetry-demos) used to capture this metrics data set) this is an important metric that directly observes the quality users get. Or in other \n", " words, it's a metric that captures an aspect of the final **output** if your app, an aspect of what it **delivers.**\n", "\n", " In these situations you might want to use a `HISTOGRAM`, because of the greater detail that it offers. It is a safer choice - in your initial \n",