Skip to content

Commit

Permalink
Merge pull request #33 from mishmash-io/publish-druid
Browse files Browse the repository at this point in the history
Publish druid input format implementation
  • Loading branch information
arusevm authored Oct 10, 2024
2 parents f3f59fd + ac8b168 commit cbbd967
Show file tree
Hide file tree
Showing 22 changed files with 876 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;

import io.mishmash.opentelemetry.persistence.proto.ProtobufLogs;
import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog;
import io.mishmash.opentelemetry.persistence.protobuf.ProtobufLogs;
import io.mishmash.opentelemetry.server.collector.Log;
import io.mishmash.opentelemetry.server.collector.LogsFlattener;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
Expand All @@ -53,17 +55,24 @@ public class LogsReader extends IntermediateRowParsingReader<PersistedLog> {
* True if the 'raw' format was configured.
*/
private boolean isRaw = false;
/**
* The ingestion schema config.
*/
private InputRowSchema schema;

/**
* Create an OTLP logs reader.
*
* @param rowSchema the schema as set in ingestion config
* @param input the {@link InputEntity} containing protobuf-encoded bytes
* @param isRawFormat true if input contains a 'raw'
* {@link ExportLogsServiceRequest}
*/
public LogsReader(
final InputRowSchema rowSchema,
final InputEntity input,
final boolean isRawFormat) {
this.schema = rowSchema;
this.source = input;
this.isRaw = isRawFormat;
}
Expand All @@ -75,8 +84,10 @@ public LogsReader(
protected List<InputRow> parseInputRows(
final PersistedLog intermediateRow)
throws IOException, ParseException {
// TODO Auto-generated method stub
return null;
return Collections.singletonList(
MapInputRowParser.parse(
schema,
ProtobufLogs.toJsonMap(intermediateRow)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;

import io.mishmash.opentelemetry.persistence.proto.ProtobufMetrics;
import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric;
import io.mishmash.opentelemetry.persistence.protobuf.ProtobufMetrics;
import io.mishmash.opentelemetry.server.collector.MetricDataPoint;
import io.mishmash.opentelemetry.server.collector.MetricsFlattener;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
Expand All @@ -54,17 +56,24 @@ public class MetricsReader
* True if the 'raw' format was configured.
*/
private boolean isRaw = false;
/**
* The ingestion schema config.
*/
private InputRowSchema schema;

/**
* Create an OTLP metrics reader.
*
* @param rowSchema the schema as set in ingestion config
* @param input the {@link InputEntity} containing protobuf-encoded bytes
* @param isRawFormat true if input contains a 'raw'
* {@link ExportMetricsServiceRequest}
*/
public MetricsReader(
final InputRowSchema rowSchema,
final InputEntity input,
final boolean isRawFormat) {
this.schema = rowSchema;
this.source = input;
this.isRaw = isRawFormat;
}
Expand All @@ -76,8 +85,10 @@ public MetricsReader(
protected List<InputRow> parseInputRows(
final PersistedMetric intermediateRow)
throws IOException, ParseException {
// TODO Auto-generated method stub
return null;
return Collections.singletonList(
MapInputRowParser.parse(
schema,
ProtobufMetrics.toJsonMap(intermediateRow)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,23 @@ public InputEntityReader createReader(
final InputRowSchema inputRowSchema,
final InputEntity source,
final File temporaryDirectory) {
// use to filter out columns:
inputRowSchema.getColumnsFilter();
inputRowSchema.getDimensionsSpec();
inputRowSchema.getTimestampSpec();
inputRowSchema.getMetricNames();

switch (getOtlpInputSignal()) {
case LOGS_FLAT:
return new LogsReader(source, false);
return new LogsReader(inputRowSchema, source, false);
case LOGS_RAW:
return new LogsReader(source, true);
return new LogsReader(inputRowSchema, source, true);
case METRICS_FLAT:
return new MetricsReader(source, false);
return new MetricsReader(inputRowSchema, source, false);
case METRICS_RAW:
return new MetricsReader(source, true);
return new MetricsReader(inputRowSchema, source, true);
case TRACES_FLAT:
return new TracesReader(source, false);
return new TracesReader(inputRowSchema, source, false);
case TRACES_RAW:
return new TracesReader(source, true);
return new TracesReader(inputRowSchema, source, true);
case PROFILES_FLAT:
return new ProfilesReader(source, false);
return new ProfilesReader(inputRowSchema, source, false);
case PROFILES_RAW:
return new ProfilesReader(source, true);
return new ProfilesReader(inputRowSchema, source, true);
default:
// should not happen
throw new UnsupportedOperationException("Internal error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;

import io.mishmash.opentelemetry.persistence.proto.ProtobufProfiles;
import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile;
import io.mishmash.opentelemetry.persistence.protobuf.ProtobufProfiles;
import io.mishmash.opentelemetry.server.collector.ProfileSampleValue;
import io.mishmash.opentelemetry.server.collector.ProfilesFlattener;
import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceRequest;
Expand All @@ -54,17 +56,24 @@ public class ProfilesReader
* True if the 'raw' format was configured.
*/
private boolean isRaw = false;
/**
* The ingestion schema config.
*/
private InputRowSchema schema;

/**
* Create an OTLP profiles reader.
*
* @param rowSchema the schema as set in ingestion config
* @param input the {@link InputEntity} containing protobuf-encoded bytes
* @param isRawFormat true if input contains a 'raw'
* {@link ExportProfilesServiceRequest}
*/
public ProfilesReader(
final InputRowSchema rowSchema,
final InputEntity input,
final boolean isRawFormat) {
this.schema = rowSchema;
this.source = input;
this.isRaw = isRawFormat;
}
Expand All @@ -76,8 +85,10 @@ public ProfilesReader(
protected List<InputRow> parseInputRows(
final PersistedProfile intermediateRow)
throws IOException, ParseException {
// TODO Auto-generated method stub
return null;
return Collections.singletonList(
MapInputRowParser.parse(
schema,
ProtobufProfiles.toJsonMap(intermediateRow)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;

import io.mishmash.opentelemetry.persistence.proto.ProtobufSpans;
import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan;
import io.mishmash.opentelemetry.persistence.protobuf.ProtobufSpans;
import io.mishmash.opentelemetry.server.collector.Span;
import io.mishmash.opentelemetry.server.collector.TracesFlattener;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
Expand All @@ -53,17 +55,24 @@ public class TracesReader extends IntermediateRowParsingReader<PersistedSpan> {
* True if the 'raw' format was configured.
*/
private boolean isRaw = false;
/**
* The ingestion schema config.
*/
private InputRowSchema schema;

/**
* Create an OTLP logs reader.
* Create an OTLP traces reader.
*
* @param rowSchema the schema as set in ingestion config
* @param input the {@link InputEntity} containing protobuf-encoded bytes
* @param isRawFormat true if input contains a 'raw'
* {@link ExportTraceServiceRequest}
*/
public TracesReader(
final InputRowSchema rowSchema,
final InputEntity input,
final boolean isRawFormat) {
this.schema = rowSchema;
this.source = input;
this.isRaw = isRawFormat;
}
Expand All @@ -75,8 +84,10 @@ public TracesReader(
protected List<InputRow> parseInputRows(
final PersistedSpan intermediateRow)
throws IOException, ParseException {
// TODO Auto-generated method stub
return null;
return Collections.singletonList(
MapInputRowParser.parse(
schema,
ProtobufSpans.toJsonMap(intermediateRow)));
}

/**
Expand Down
6 changes: 6 additions & 0 deletions persistence-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@
<artifactId>collector-embedded</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package io.mishmash.opentelemetry.persistence.proto;
package io.mishmash.opentelemetry.persistence.protobuf;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package io.mishmash.opentelemetry.persistence.proto;
package io.mishmash.opentelemetry.persistence.protobuf;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package io.mishmash.opentelemetry.persistence.proto;
package io.mishmash.opentelemetry.persistence.protobuf;

import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package io.mishmash.opentelemetry.persistence.proto;
package io.mishmash.opentelemetry.persistence.protobuf;

import java.util.Map;

Expand Down
Loading

0 comments on commit cbbd967

Please sign in to comment.