Skip to content

Commit

Permalink
Merge pull request #32 from mishmash-io/publish-druid
Browse files Browse the repository at this point in the history
Publish Apache Druid input format skeleton implementation
  • Loading branch information
arusevm authored Oct 8, 2024
2 parents ccc6ef7 + a97e825 commit f3f59fd
Show file tree
Hide file tree
Showing 32 changed files with 3,537 additions and 558 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ Parquet files as saved by this Stand-alone server.
- [README](./server-parquet)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/server-parquet)

## Apache Druid Input Format

Use this artifact when ingesting OpenTelemetry signals into [Apache Druid](https://druid.apache.org), in combination with an Input Source (like Apache Kafka or other).

Apache Druid is a high performance, real-time analytics database that delivers sub-second queries on streaming and batch data at scale and under load. This makes it perfect for OpenTelemetry data analytics.

With this OTLP Input Format you can build OpenTelemetry ingestion pipelines into Apache
Druid. For example:
- Use the [OpenTelemetry Kafka Exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/kafkaexporter/README.md) to publish
OTLP signals to an Apache Kafka topic, then the [Druid Kafka Ingestion](https://druid.apache.org/docs/latest/ingestion/kafka-ingestion/) with this Input Format to get Druid
tables with your telemetry.
- In a similar way you can also use other Druid input sources developed by mishmash io -
like with [Apache BookKeeper](https://bookkeeper.apache.org) or [Apache Pulsar](https://pulsar.apache.org). For details - check the related artifact documentation.

Find out more about the OTLP Input Format for Apache Druid:
- [README](./druid-input-format)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/druid-input-format)

# OpenTelemetry at mishmash io

OpenTelemetry's main intent is the observability of production environments, but at [mishmash io](https://mishmash.io) it is part of our software development process. By saving telemetry from **experiments** and **tests** of
Expand All @@ -44,4 +62,3 @@ We believe that adopting OpenTelemetry as a software development tool might be u

Learn more about the broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at
[mishmash io](https://mishmash.io/) and `follow` [GitHub profile](https://github.com/mishmash-io) for updates and new releases.

10 changes: 10 additions & 0 deletions collector-embedded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

Expand All @@ -114,6 +118,12 @@
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package io.mishmash.opentelemetry.server.collector;

import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -29,9 +28,6 @@
import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.vertx.core.Vertx;

/**
Expand Down Expand Up @@ -94,94 +90,73 @@ public Batch<Log> loadBatch(

Batch<Log> batch = new Batch<>(otelContext);

long timestamp = System.currentTimeMillis();
String uuid = UUID.randomUUID().toString();

int requestItems = 0;

for (ResourceLogs logs : request.getResourceLogsList()) {
for (ScopeLogs scopeLogs : logs.getScopeLogsList()) {
for (LogRecord log : scopeLogs.getLogRecordsList()) {
if (batch.isCancelled()) {
return batch;
}
for (Log l : new LogsFlattener(
batch,
Context.current(),
request,
Vertx.currentContext().get(VCTX_EMITTER))) {
if (batch.isCancelled()) {
return batch;
}

/*
* FIXME: check if is valid and add an error message
* (but still allow it to go to subscribers)
*/

Span recordSpan = getInstrumentation()
.startNewSpan("otel.record");
requestItems++;
l.addAll(getSubscribers());
l.setLoaded();
batch.add(l);

Log l = new Log(batch,
Context.current(),
Vertx.currentContext().get(VCTX_EMITTER));
l.setFrom(
timestamp,
uuid,
requestItems++,
logs,
scopeLogs,
log);

/*
* FIXME: check if is valid and add an error message
* (but still allow it to go to subscribers)
*/

l.addAll(getSubscribers());
l.setLoaded();
batch.add(l);

recordSpan.addEvent("Request item loaded");

int estimatedLag = offer(
l,
(subscriber, droppedItem) -> {
/*
* set an error on this in the response,
* FIXME: use another exception class
*/
droppedItem.completeExceptionally(
new RuntimeException(
"Logs collector subscriber "
+ subscriber
+ " dropped a log record"));

// droppedItem.complete(subscriber);
// do not retry
return false;
});

if (estimatedLag < 0) {
// says how many subscribers dropped the message
LOG.info(
String.format(
"Logs batch %s has %d drop(s)",
uuid,
(-estimatedLag)));
addDroppedRequestItems(
(-estimatedLag),
transport,
encoding);
} else if (estimatedLag == 0) {
// there were no subscribers, set an error
batch.setLoadFailed(
new IllegalStateException("""
Logs collector currently has \
no subscribers"""));
LOG.log(Level.SEVERE, """
Logs batch load failed, logs collector \
currently has no subscribers. \
Batch id: """
+ uuid);

return batch;
// } else {
int estimatedLag = offer(
l,
(subscriber, droppedItem) -> {
/*
* positive number is the estimated lag - number
* of items submitted but not yet consumed
* set an error on this in the response,
* FIXME: use another exception class
*/

// LOG.info("Logs estimated lag: " + estimatedLag);
}
}
droppedItem.completeExceptionally(
new RuntimeException(
"Logs collector subscriber "
+ subscriber
+ " dropped a log record"));

// droppedItem.complete(subscriber);
// do not retry
return false;
});

if (estimatedLag < 0) {
// says how many subscribers dropped the message
LOG.info(
String.format(
"Logs batch has %d drop(s)",
(-estimatedLag)));
addDroppedRequestItems(
(-estimatedLag),
transport,
encoding);
} else if (estimatedLag == 0) {
// there were no subscribers, set an error
batch.setLoadFailed(
new IllegalStateException("""
Logs collector currently has \
no subscribers"""));
LOG.log(Level.SEVERE, """
Logs batch load failed, logs collector \
currently has no subscribers. """);

return batch;
// } else {
/*
* positive number is the estimated lag - number
* of items submitted but not yet consumed
*/

// LOG.info("Logs estimated lag: " + estimatedLag);
}
}

Expand Down
Loading

0 comments on commit f3f59fd

Please sign in to comment.