Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish Apache Druid input format skeleton implementation #32

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ Parquet files as saved by this Stand-alone server.
- [README](./server-parquet)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/server-parquet)

## Apache Druid Input Format

Use this artifact when ingesting OpenTelemetry signals into [Apache Druid](https://druid.apache.org), in combination with an Input Source (like Apache Kafka or other).

Apache Druid is a high performance, real-time analytics database that delivers sub-second queries on streaming and batch data at scale and under load. This makes it perfect for OpenTelemetry data analytics.

With this OTLP Input Format you can build OpenTelemetry ingestion pipelines into Apache
Druid. For example:
- Use the [OpenTelemetry Kafka Exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/kafkaexporter/README.md) to publish
OTLP signals to an Apache Kafka topic, then the [Druid Kafka Ingestion](https://druid.apache.org/docs/latest/ingestion/kafka-ingestion/) with this Input Format to get Druid
tables with your telemetry.
- In a similar way you can also use other Druid input sources developed by mishmash io -
like with [Apache BookKeeper](https://bookkeeper.apache.org) or [Apache Pulsar](https://pulsar.apache.org). For details - check the related artifact documentation.

Find out more about the OTLP Input Format for Apache Druid:
- [README](./druid-input-format)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/druid-input-format)

# OpenTelemetry at mishmash io

OpenTelemetry's main intent is the observability of production environments, but at [mishmash io](https://mishmash.io) it is part of our software development process. By saving telemetry from **experiments** and **tests** of
Expand All @@ -44,4 +62,3 @@ We believe that adopting OpenTelemetry as a software development tool might be u

Learn more about the broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at
[mishmash io](https://mishmash.io/) and `follow` [GitHub profile](https://github.com/mishmash-io) for updates and new releases.

10 changes: 10 additions & 0 deletions collector-embedded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

Expand All @@ -114,6 +118,12 @@
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package io.mishmash.opentelemetry.server.collector;

import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -29,9 +28,6 @@
import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.vertx.core.Vertx;

/**
Expand Down Expand Up @@ -94,94 +90,73 @@ public Batch<Log> loadBatch(

Batch<Log> batch = new Batch<>(otelContext);

long timestamp = System.currentTimeMillis();
String uuid = UUID.randomUUID().toString();

int requestItems = 0;

for (ResourceLogs logs : request.getResourceLogsList()) {
for (ScopeLogs scopeLogs : logs.getScopeLogsList()) {
for (LogRecord log : scopeLogs.getLogRecordsList()) {
if (batch.isCancelled()) {
return batch;
}
for (Log l : new LogsFlattener(
batch,
Context.current(),
request,
Vertx.currentContext().get(VCTX_EMITTER))) {
if (batch.isCancelled()) {
return batch;
}

/*
* FIXME: check if is valid and add an error message
* (but still allow it to go to subscribers)
*/

Span recordSpan = getInstrumentation()
.startNewSpan("otel.record");
requestItems++;
l.addAll(getSubscribers());
l.setLoaded();
batch.add(l);

Log l = new Log(batch,
Context.current(),
Vertx.currentContext().get(VCTX_EMITTER));
l.setFrom(
timestamp,
uuid,
requestItems++,
logs,
scopeLogs,
log);

/*
* FIXME: check if is valid and add an error message
* (but still allow it to go to subscribers)
*/

l.addAll(getSubscribers());
l.setLoaded();
batch.add(l);

recordSpan.addEvent("Request item loaded");

int estimatedLag = offer(
l,
(subscriber, droppedItem) -> {
/*
* set an error on this in the response,
* FIXME: use another exception class
*/
droppedItem.completeExceptionally(
new RuntimeException(
"Logs collector subscriber "
+ subscriber
+ " dropped a log record"));

// droppedItem.complete(subscriber);
// do not retry
return false;
});

if (estimatedLag < 0) {
// says how many subscribers dropped the message
LOG.info(
String.format(
"Logs batch %s has %d drop(s)",
uuid,
(-estimatedLag)));
addDroppedRequestItems(
(-estimatedLag),
transport,
encoding);
} else if (estimatedLag == 0) {
// there were no subscribers, set an error
batch.setLoadFailed(
new IllegalStateException("""
Logs collector currently has \
no subscribers"""));
LOG.log(Level.SEVERE, """
Logs batch load failed, logs collector \
currently has no subscribers. \
Batch id: """
+ uuid);

return batch;
// } else {
int estimatedLag = offer(
l,
(subscriber, droppedItem) -> {
/*
* positive number is the estimated lag - number
* of items submitted but not yet consumed
* set an error on this in the response,
* FIXME: use another exception class
*/

// LOG.info("Logs estimated lag: " + estimatedLag);
}
}
droppedItem.completeExceptionally(
new RuntimeException(
"Logs collector subscriber "
+ subscriber
+ " dropped a log record"));

// droppedItem.complete(subscriber);
// do not retry
return false;
});

if (estimatedLag < 0) {
// says how many subscribers dropped the message
LOG.info(
String.format(
"Logs batch has %d drop(s)",
(-estimatedLag)));
addDroppedRequestItems(
(-estimatedLag),
transport,
encoding);
} else if (estimatedLag == 0) {
// there were no subscribers, set an error
batch.setLoadFailed(
new IllegalStateException("""
Logs collector currently has \
no subscribers"""));
LOG.log(Level.SEVERE, """
Logs batch load failed, logs collector \
currently has no subscribers. """);

return batch;
// } else {
/*
* positive number is the estimated lag - number
* of items submitted but not yet consumed
*/

// LOG.info("Logs estimated lag: " + estimatedLag);
}
}

Expand Down
Loading
Loading