diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java index 4556b81..f7f8b02 100644 --- a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/LogsReader.java @@ -25,12 +25,14 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; -import io.mishmash.opentelemetry.persistence.proto.ProtobufLogs; import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufLogs; import io.mishmash.opentelemetry.server.collector.Log; import io.mishmash.opentelemetry.server.collector.LogsFlattener; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; @@ -53,17 +55,24 @@ public class LogsReader extends IntermediateRowParsingReader { * True if the 'raw' format was configured. */ private boolean isRaw = false; + /** + * The ingestion schema config. + */ + private InputRowSchema schema; /** * Create an OTLP logs reader. * + * @param rowSchema the schema as set in ingestion config * @param input the {@link InputEntity} containing protobuf-encoded bytes * @param isRawFormat true if input contains a 'raw' * {@link ExportLogsServiceRequest} */ public LogsReader( + final InputRowSchema rowSchema, final InputEntity input, final boolean isRawFormat) { + this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; } @@ -75,8 +84,10 @@ public LogsReader( protected List parseInputRows( final PersistedLog intermediateRow) throws IOException, ParseException { - // TODO Auto-generated method stub - return null; + return Collections.singletonList( + MapInputRowParser.parse( + schema, + ProtobufLogs.toJsonMap(intermediateRow))); } /** diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java index 4040e60..ac9184b 100644 --- a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/MetricsReader.java @@ -25,12 +25,14 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; -import io.mishmash.opentelemetry.persistence.proto.ProtobufMetrics; import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufMetrics; import io.mishmash.opentelemetry.server.collector.MetricDataPoint; import io.mishmash.opentelemetry.server.collector.MetricsFlattener; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; @@ -54,17 +56,24 @@ public class MetricsReader * True if the 'raw' format was configured. */ private boolean isRaw = false; + /** + * The ingestion schema config. + */ + private InputRowSchema schema; /** * Create an OTLP metrics reader. * + * @param rowSchema the schema as set in ingestion config * @param input the {@link InputEntity} containing protobuf-encoded bytes * @param isRawFormat true if input contains a 'raw' * {@link ExportMetricsServiceRequest} */ public MetricsReader( + final InputRowSchema rowSchema, final InputEntity input, final boolean isRawFormat) { + this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; } @@ -76,8 +85,10 @@ public MetricsReader( protected List parseInputRows( final PersistedMetric intermediateRow) throws IOException, ParseException { - // TODO Auto-generated method stub - return null; + return Collections.singletonList( + MapInputRowParser.parse( + schema, + ProtobufMetrics.toJsonMap(intermediateRow))); } /** diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java index 3cd0143..bf18510 100644 --- a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/OTLPInputFormat.java @@ -123,29 +123,23 @@ public InputEntityReader createReader( final InputRowSchema inputRowSchema, final InputEntity source, final File temporaryDirectory) { - // use to filter out columns: - inputRowSchema.getColumnsFilter(); - inputRowSchema.getDimensionsSpec(); - inputRowSchema.getTimestampSpec(); - inputRowSchema.getMetricNames(); - switch (getOtlpInputSignal()) { case LOGS_FLAT: - return new LogsReader(source, false); + return new LogsReader(inputRowSchema, source, false); case LOGS_RAW: - return new LogsReader(source, true); + return new LogsReader(inputRowSchema, source, true); case METRICS_FLAT: - return new MetricsReader(source, false); + return new MetricsReader(inputRowSchema, source, false); case METRICS_RAW: - return new MetricsReader(source, true); + return new MetricsReader(inputRowSchema, source, true); case TRACES_FLAT: - return new TracesReader(source, false); + return new TracesReader(inputRowSchema, source, false); case TRACES_RAW: - return new TracesReader(source, true); + return new TracesReader(inputRowSchema, source, true); case PROFILES_FLAT: - return new ProfilesReader(source, false); + return new ProfilesReader(inputRowSchema, source, false); case PROFILES_RAW: - return new ProfilesReader(source, true); + return new ProfilesReader(inputRowSchema, source, true); default: // should not happen throw new UnsupportedOperationException("Internal error"); diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java index 72a4a4e..f4c479b 100644 --- a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/ProfilesReader.java @@ -25,12 +25,14 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; -import io.mishmash.opentelemetry.persistence.proto.ProtobufProfiles; import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufProfiles; import io.mishmash.opentelemetry.server.collector.ProfileSampleValue; import io.mishmash.opentelemetry.server.collector.ProfilesFlattener; import io.opentelemetry.proto.collector.profiles.v1experimental.ExportProfilesServiceRequest; @@ -54,17 +56,24 @@ public class ProfilesReader * True if the 'raw' format was configured. */ private boolean isRaw = false; + /** + * The ingestion schema config. + */ + private InputRowSchema schema; /** * Create an OTLP profiles reader. * + * @param rowSchema the schema as set in ingestion config * @param input the {@link InputEntity} containing protobuf-encoded bytes * @param isRawFormat true if input contains a 'raw' * {@link ExportProfilesServiceRequest} */ public ProfilesReader( + final InputRowSchema rowSchema, final InputEntity input, final boolean isRawFormat) { + this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; } @@ -76,8 +85,10 @@ public ProfilesReader( protected List parseInputRows( final PersistedProfile intermediateRow) throws IOException, ParseException { - // TODO Auto-generated method stub - return null; + return Collections.singletonList( + MapInputRowParser.parse( + schema, + ProtobufProfiles.toJsonMap(intermediateRow))); } /** diff --git a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java index d9a2680..64fafbb 100644 --- a/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java +++ b/druid-input-format/src/main/java/io/mishmash/opentelemetry/druid/format/TracesReader.java @@ -25,12 +25,14 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; -import io.mishmash.opentelemetry.persistence.proto.ProtobufSpans; import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufSpans; import io.mishmash.opentelemetry.server.collector.Span; import io.mishmash.opentelemetry.server.collector.TracesFlattener; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; @@ -53,17 +55,24 @@ public class TracesReader extends IntermediateRowParsingReader { * True if the 'raw' format was configured. */ private boolean isRaw = false; + /** + * The ingestion schema config. + */ + private InputRowSchema schema; /** - * Create an OTLP logs reader. + * Create an OTLP traces reader. * + * @param rowSchema the schema as set in ingestion config * @param input the {@link InputEntity} containing protobuf-encoded bytes * @param isRawFormat true if input contains a 'raw' * {@link ExportTraceServiceRequest} */ public TracesReader( + final InputRowSchema rowSchema, final InputEntity input, final boolean isRawFormat) { + this.schema = rowSchema; this.source = input; this.isRaw = isRawFormat; } @@ -75,8 +84,10 @@ public TracesReader( protected List parseInputRows( final PersistedSpan intermediateRow) throws IOException, ParseException { - // TODO Auto-generated method stub - return null; + return Collections.singletonList( + MapInputRowParser.parse( + schema, + ProtobufSpans.toJsonMap(intermediateRow))); } /** diff --git a/persistence-protobuf/pom.xml b/persistence-protobuf/pom.xml index 9e1fc59..2dbdff9 100644 --- a/persistence-protobuf/pom.xml +++ b/persistence-protobuf/pom.xml @@ -137,6 +137,12 @@ collector-embedded ${project.version} + + + org.junit.jupiter + junit-jupiter-engine + test + diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogs.java similarity index 98% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogs.java index 8df8b45..19b4dce 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogs.java @@ -15,7 +15,7 @@ * */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; import java.util.Map; diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufMetrics.java similarity index 99% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufMetrics.java index e7488e8..0110146 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufMetrics.java @@ -15,7 +15,7 @@ * */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; import java.util.Map; diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufProfiles.java similarity index 99% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufProfiles.java index 848ffd8..29fe6dc 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufProfiles.java @@ -15,7 +15,7 @@ * */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; import java.util.ArrayList; import java.util.Collections; diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufSpans.java similarity index 98% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufSpans.java index f7673ac..b0e860d 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufSpans.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufSpans.java @@ -15,7 +15,7 @@ * */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; import java.util.Map; diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java similarity index 67% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java index c7e281f..d068b5a 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufUtils.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufUtils.java @@ -15,7 +15,7 @@ * */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; import java.util.ArrayList; import java.util.HashMap; @@ -37,6 +37,9 @@ import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + /** * Various helper methods for general protobuf message handling. */ @@ -47,34 +50,35 @@ public final class ProtobufUtils { * * Helps with avoiding unnecessary nesting. */ - private static Map> converters; + private static final Map> CONVERTERS = + new HashMap<>(); static { - converters.put( + CONVERTERS.put( BoolValue.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( Int32Value.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( UInt32Value.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( Int64Value.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( UInt64Value.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( StringValue.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( BytesValue.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( FloatValue.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); - converters.put( + CONVERTERS.put( DoubleValue.getDescriptor().getFullName(), ProtobufUtils::toJsonNestedValue); @@ -97,7 +101,7 @@ public static Map toJsonMap( Map res = new HashMap<>(entries.size()); for (Map.Entry ent : entries.entrySet()) { - res.put(ent.getKey().getJsonName(), + res.put(ent.getKey().getName(), toJsonValue(ent.getKey(), ent.getValue())); } @@ -114,7 +118,7 @@ private static Object toJsonValue( if (value instanceof Message) { Message msg = (Message) value; Function converter = - converters.get(msg.getDescriptorForType().getFullName()); + CONVERTERS.get(msg.getDescriptorForType().getFullName()); if (converter != null) { return converter.apply(msg); @@ -150,19 +154,72 @@ private static Object toJsonValue( return resMap; } else if (field.isRepeated()) { - List valuesList = (List) value; - List resList = new ArrayList<>(valuesList.size()); + if (FieldDescriptor.Type.MESSAGE.equals(field.getType()) + && "opentelemetry.proto.common.v1.KeyValue".equals( + field.getMessageType().getFullName())) { + @SuppressWarnings("unchecked") + List kvs = (List) value; - for (Object o : valuesList) { - resList.add(toJsonValueSingle(field, o)); - } + return toJsonValue(kvs); + } else { + List valuesList = (List) value; + List resList = new ArrayList<>(valuesList.size()); + + for (Object o : valuesList) { + resList.add(toJsonValueSingle(field, o)); + } - return resList; + return resList; + } } else { return toJsonValueSingle(field, value); } } + private static Object toJsonValue(final List kvs) { + Map res = new HashMap<>(kvs.size()); + + for (KeyValue kv : kvs) { + res.put( + kv.getKey(), + toJsonValue(kv.getValue())); + } + + return res; + } + + private static Object toJsonValue(final AnyValue av) { + switch (av.getValueCase()) { + case ARRAY_VALUE: + List res = new ArrayList<>( + av.getArrayValue().getValuesCount()); + for (AnyValue a : av.getArrayValue().getValuesList()) { + res.add(toJsonValue(a)); + } + + return res; + case BOOL_VALUE: + return av.getBoolValue(); + case BYTES_VALUE: + return av.getBytesValue().toByteArray(); + case DOUBLE_VALUE: + return av.getDoubleValue(); + case INT_VALUE: + return av.getIntValue(); + case KVLIST_VALUE: + return toJsonValue(av.getKvlistValue().getValuesList()); + case STRING_VALUE: + return av.getStringValue(); + case VALUE_NOT_SET: + return null; + default: + // should not happen! + throw new IllegalArgumentException( + String.format("Unsupported AnyValue type '%s'", + av.getValueCase())); + } + } + private static Object toJsonValueSingle( final FieldDescriptor field, final Object value) { @@ -173,6 +230,9 @@ private static Object toJsonValueSingle( if ("google.protobuf.NullValue" .equals(field.getEnumType().getFullName())) { return null; + } else if ("opentelemetry.proto.logs.v1.SeverityNumber" + .equals(field.getEnumType().getFullName())) { + return ((EnumValueDescriptor) value).getNumber(); } return ((EnumValueDescriptor) value).getName(); diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/package-info.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/package-info.java similarity index 96% rename from persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/package-info.java rename to persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/package-info.java index 6a9f1d0..62c2405 100644 --- a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/package-info.java +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/protobuf/package-info.java @@ -34,4 +34,4 @@ * This package on GitHub * @see mishmash io */ -package io.mishmash.opentelemetry.persistence.proto; +package io.mishmash.opentelemetry.persistence.protobuf; diff --git a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto index 862c6ec..101e71c 100644 --- a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto @@ -79,9 +79,11 @@ message PersistedLog { // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url string log_schema_url = 28; + reserved 29, 30; + // is_valid will be false if the system found this record to not comply with OTLP specs. - bool is_valid = 29; + bool is_valid = 100; // when is_valid is false, error_message will provide some information on what was wrong. - string error_message = 30; + optional string error_message = 101; } diff --git a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto index 2bbe8b1..b56e8f5 100644 --- a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto @@ -115,9 +115,11 @@ message PersistedMetric { // Additional metadata attributes that describe the metric, optional. repeated .opentelemetry.proto.common.v1.KeyValue metric_metadata = 48; + reserved 49, 50; + // is_valid will be false if the system found this record to not comply with OTLP specs. - bool is_valid = 49; + bool is_valid = 100; // when is_valid is false, error_message will provide some information on what was wrong. - string error_message = 50; + optional string error_message = 101; } diff --git a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto index dd54b61..171de98 100644 --- a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto @@ -103,11 +103,13 @@ message PersistedProfile { // the value type StrValueType type = 39; + reserved 40, 41; + // is_valid will be false if the system found this record to not comply with OTLP specs. - bool is_valid = 40; + bool is_valid = 100; // when is_valid is false, error_message will provide some information on what was wrong. - string error_message = 41; + optional string error_message = 101; } // An opentelemetry.proto.profiles.v1experimental.ValueType with extracted strings diff --git a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto index a330d13..e4563e5 100644 --- a/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto @@ -74,9 +74,11 @@ message PersistedSpan { // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url string span_schema_url = 27; + reserved 28, 29; + // is_valid will be false if the system found this record to not comply with OTLP specs. - bool is_valid = 28; + bool is_valid = 100; // when is_valid is false, error_message will provide some information on what was wrong. - string error_message = 29; + optional string error_message = 101; } diff --git a/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/Base.java b/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/Base.java new file mode 100644 index 0000000..1caa365 --- /dev/null +++ b/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/Base.java @@ -0,0 +1,438 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.protobuf; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.google.protobuf.ByteString; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import io.opentelemetry.proto.resource.v1.Resource; + +public class Base { + + private static final long BATCH_TIMESTAMP = 1; + private static final String BATCH_UUID = "test batch uuid"; + + private static final String RESOURCE_SCHEMA_URL = "http://resource.schema.test"; + private static final int RESOURCE_DROPPED_ATTRIBUTES_COUNT = 10; + private static final String RESOURCE_ATTR_KEY = "resource-attribute"; + private static final String RESOURCE_ATTR_VAL = "test RA"; + private static final String SCOPE_ATTR_KEY = "scope-attribute"; + private static final int SCOPE_ATTR_VAL = 3; + private static final int SCOPE_DROPPED_ATTRIBUTES_COUNT = 11; + private static final String SCOPE_NAME = "test-scope-name"; + private static final String SCOPE_VERSION = "1.0.0-test"; + + public long batchTimestamp() { + return BATCH_TIMESTAMP; + } + + public void testBatchTimestamp(long actual) { + testBatchTimestamp(batchTimestamp(), actual); + } + + public void testBatchTimestamp(long expected, long actual) { + assertEquals(expected, actual); + } + + public String batchUUID() { + return BATCH_UUID; + } + + public void testBatchUUID(String actual) { + testBatchUUID(batchUUID(), actual); + } + + public void testBatchUUID(String expected, String actual) { + assertEquals(expected, actual); + } + + public String resourceSchemaUrl() { + return RESOURCE_SCHEMA_URL; + } + + public void testResourceSchemaUrl(String actual) { + testResourceSchemaUrl(resourceSchemaUrl(), actual); + } + + public void testResourceSchemaUrl(String expected, String actual) { + assertEquals(expected, actual); + } + + public Integer resourceDroppedAttributesCount() { + return RESOURCE_DROPPED_ATTRIBUTES_COUNT; + } + + public void testResourceDroppedAttributesCount(Integer actual) { + testResourceDroppedAttributesCount( + resourceDroppedAttributesCount(), + actual); + } + + public void testResourceDroppedAttributesCount( + Integer expected, + Integer actual) { + assertEquals(expected, actual); + } + + public Resource resource() { + return resource( + resourceDroppedAttributesCount(), + resourceAttributes()); + } + + public Resource resource( + Integer droppedAttrsCount, + Iterable attrs) { + Resource.Builder builder = Resource.newBuilder(); + + if (droppedAttrsCount != null) { + builder = builder.setDroppedAttributesCount(droppedAttrsCount); + } + + if (attrs != null) { + builder = builder.addAllAttributes(attrs); + } + + return builder.build(); + } + + public void testResource(Resource actual) { + testResource(resource(), actual); + } + + public void testResource(Resource expected, Resource actual) { + assertEquals( + expected.getDroppedAttributesCount(), + actual.getDroppedAttributesCount()); + testAttributes( + expected.getAttributesList(), + actual.getAttributesList()); + } + + public Iterable resourceAttributes() { + return attributes( + Collections.singleton( + Map.entry( + RESOURCE_ATTR_KEY, + RESOURCE_ATTR_VAL))); + } + + public Iterable attributes( + Collection> a) { + if (a == null) { + return null; + } + + List res = new ArrayList<>(a.size()); + for (Map.Entry ent : a) { + res.add(keyValue(ent.getKey(), ent.getValue())); + } + + return res; + } + + public void testAttributes( + Iterable expected, + Iterable actual) { + assertIterableEquals(expected, actual); + } + + public String scopeName() { + return SCOPE_NAME; + } + + public void testScopeName(String actual) { + testScopeName(scopeName(), actual); + } + + public void testScopeName(String expected, String actual) { + assertEquals(expected, actual); + } + + public String scopeVersion() { + return SCOPE_VERSION; + } + + public void testScopeVersion(String actual) { + testScopeVersion(scopeVersion(), actual); + } + + public void testScopeVersion(String expected, String actual) { + assertEquals(expected, actual); + } + + public Integer scopeDroppedAttributesCount() { + return SCOPE_DROPPED_ATTRIBUTES_COUNT; + } + + public void testScopeDroppedAttributesCount(Integer actual) { + testScopeDroppedAttributesCount(scopeDroppedAttributesCount(), actual); + } + + public void testScopeDroppedAttributesCount( + Integer expected, + Integer actual) { + assertEquals(expected, actual); + } + + public Iterable scopeAttributes() { + return attributes( + Collections.singleton( + Map.entry( + SCOPE_ATTR_KEY, + SCOPE_ATTR_VAL))); + } + + public InstrumentationScope scope() { + return scope(scopeName(), + scopeVersion(), + scopeDroppedAttributesCount(), + scopeAttributes()); + } + + public InstrumentationScope scope( + String name, + String version, + Integer droppedAttrsCount, + Iterable attrs) { + InstrumentationScope.Builder builder = InstrumentationScope + .newBuilder(); + + if (name != null) { + builder = builder.setName(name); + } + + if (version != null) { + builder = builder.setVersion(version); + } + + if (droppedAttrsCount != null) { + builder = builder.setDroppedAttributesCount(droppedAttrsCount); + } + + if (attrs != null) { + builder = builder.addAllAttributes(attrs); + } + + return builder.build(); + } + + public void testScope(InstrumentationScope actual) { + testScope(scope(), actual); + } + + public void testScope( + InstrumentationScope expected, + InstrumentationScope actual) { + assertEquals(expected.getName(), actual.getName()); + assertEquals(expected.getVersion(), actual.getVersion()); + assertEquals( + expected.getDroppedAttributesCount(), + actual.getDroppedAttributesCount()); + testAttributes( + expected.getAttributesList(), + actual.getAttributesList()); + } + + public KeyValue keyValue(String key, Object value) { + KeyValue.Builder builder = KeyValue.newBuilder() + .setKey(key); + + if (value != null) { + builder = builder.setValue(anyValue(value)); + } + + return builder.build(); + } + + public AnyValue anyValue(String valueType, Object value) { + return anyValue(AnyValue.ValueCase.valueOf(valueType), value); + } + + @SuppressWarnings("unchecked") + public AnyValue anyValue(AnyValue.ValueCase vCase, Object value) { + AnyValue.Builder builder = AnyValue.newBuilder(); + + switch (vCase) { + case ARRAY_VALUE: + builder = builder.setArrayValue( + arrayValue((Collection) value)); + break; + case BOOL_VALUE: + builder = builder.setBoolValue((boolean) value); + break; + case BYTES_VALUE: + builder = builder.setBytesValue( + ByteString.copyFrom((byte[]) value)); + break; + case DOUBLE_VALUE: + builder = builder.setDoubleValue((double) value); + break; + case INT_VALUE: + builder = builder.setIntValue((long) value); + break; + case KVLIST_VALUE: + builder = builder.setKvlistValue( + keyValueList( + (Collection>) value)); + break; + case STRING_VALUE: + builder = builder.setStringValue((String) value); + break; + case VALUE_NOT_SET: + break; + default: + throw new IllegalArgumentException("Unknown AnyValue case"); + } + + return builder.build(); + } + + public AnyValue anyValue(Object o) { + AnyValue.Builder builder = AnyValue.newBuilder(); + + if (o == null) { + return builder.build(); + } + + if (o instanceof String s) { + builder = builder.setStringValue(s); + } else if (o instanceof Boolean b) { + builder = builder.setBoolValue(b); + } else if (o instanceof Long l) { + builder = builder.setIntValue(l); + } else if (o instanceof Integer i) { + builder = builder.setIntValue(Long.valueOf(i)); + } else if (o instanceof Double d) { + builder = builder.setDoubleValue(d); + } else if (o instanceof Float f) { + builder = builder.setDoubleValue(Double.valueOf(f)); + } else if (o instanceof Map m) { + builder = builder.setKvlistValue(keyValueList(m.entrySet())); + } else if (o instanceof byte[] bytes) { + builder = builder.setBytesValue(ByteString.copyFrom(bytes)); + } else if (o instanceof ByteString byteString) { + builder = builder.setBytesValue(byteString); + } else { + throw new IllegalArgumentException("Unknown AnyValue object type"); + } + + return builder.build(); + } + + public void testAnyValue(AnyValue expected, AnyValue actual) { + assertEquals(expected.getValueCase(), actual.getValueCase()); + + switch (expected.getValueCase()) { + case STRING_VALUE: + assertEquals( + expected.getStringValue(), + actual.getStringValue()); + break; + case BOOL_VALUE: + assertEquals( + expected.getBoolValue(), + actual.getBoolValue()); + break; + case INT_VALUE: + assertEquals( + expected.getIntValue(), + actual.getIntValue()); + break; + case DOUBLE_VALUE: + assertEquals( + expected.getDoubleValue(), + actual.getDoubleValue()); + break; + case ARRAY_VALUE: + testArrayValue( + expected.getArrayValue(), + actual.getArrayValue()); + break; + case KVLIST_VALUE: + testKeyValueList( + expected.getKvlistValue(), + actual.getKvlistValue()); + break; + case BYTES_VALUE: + assertArrayEquals( + expected.getBytesValue().toByteArray(), + actual.getBytesValue().toByteArray()); + break; + case VALUE_NOT_SET: + break; + default: + } + } + + public ArrayValue arrayValue(Collection list) { + if (list == null) { + return null; + } + + ArrayValue.Builder builder = ArrayValue.newBuilder(); + for (Object o : list) { + builder = builder.addValues(anyValue(o)); + } + + return builder.build(); + } + + public void testArrayValue(ArrayValue expected, ArrayValue actual) { + assertEquals( + expected.getValuesCount(), + actual.getValuesCount()); + + for (int i = 0; i < expected.getValuesCount(); i++) { + testAnyValue( + expected.getValues(i), + actual.getValues(i)); + } + } + + public KeyValueList keyValueList(Collection> kv) { + if (kv == null) { + return null; + } + + KeyValueList.Builder builder = KeyValueList.newBuilder(); + for (Map.Entry ent : kv) { + builder = builder.addValues(keyValue(ent.getKey(), ent.getValue())); + } + + return builder.build(); + } + + public void testKeyValueList(KeyValueList expected, KeyValueList actual) { + + } +} diff --git a/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogsTests.java b/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogsTests.java new file mode 100644 index 0000000..61d7c00 --- /dev/null +++ b/persistence-protobuf/src/test/java/io/mishmash/opentelemetry/persistence/protobuf/ProtobufLogsTests.java @@ -0,0 +1,263 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.protobuf; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.google.protobuf.ByteString; + +import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.server.collector.Log; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import io.opentelemetry.proto.logs.v1.SeverityNumber; + +public class ProtobufLogsTests extends Base { + + private static final String LOG_SCHEMA_URL = "http://log.schema.test"; + private static final int LOG_SEQ_NO = 2; + private static final long LOG_TIMESTAMP = 5; + private static final long LOG_OBSERVED_TIMESTAMP = 6; + private static final int LOG_SEVERITY_NUM = 7; + private static final String LOG_SEVERITY = "SEVERITY_DEBUG3"; + private static final String LOG_BODY = "test log message"; + private static final String LOG_ATTR_KEY = "log-attr"; + private static final Boolean LOG_ATTR_VAL = Boolean.FALSE; + private static final Integer LOG_DROPPED_ATTRS_COUNT = 0; + private static final int LOG_FLAGS = 0; + private static final byte[] LOG_TRACE_ID = new byte[] {0x11}; + private static final byte[] LOG_SPAN_ID = new byte[] {0x12}; + + @Test + void buildPersistedLog() { + Log l = defaultLog(); + PersistedLog pl = ProtobufLogs.buildLog(l).build(); + + testBatchTimestamp(pl.getBatchTimestamp()); + testBatchUUID(pl.getBatchUUID()); + assertEquals(LOG_SEQ_NO, pl.getSeqNo()); + testResourceSchemaUrl(l.getResourceSchemaUrl()); + testResource(l.getResource()); + assertEquals(LOG_SCHEMA_URL, l.getLogSchemaUrl()); + testScope(l.getScope()); + testLog(l.getLog()); + assertEquals(l.isValid(), pl.getIsValid()); + assertEquals(l.getErrorMessage() != null, pl.hasErrorMessage()); + assertEquals( + l.getErrorMessage() == null + ? "" + : l.getErrorMessage(), + pl.getErrorMessage()); + } + + @Test + void toJsonMap() { + Log l = defaultLog(); + PersistedLog pl = ProtobufLogs.buildLog(l).build(); + + Map res = ProtobufLogs.toJsonMap(pl); + + testBatchTimestamp((Long) res.get("batch_timestamp")); + testBatchUUID((String) res.get("batch_UUID")); + assertEquals(LOG_SEQ_NO, res.get("seq_no")); + testResourceSchemaUrl((String) res.get("resource_schema_url")); + @SuppressWarnings("unchecked") + Map resAttrs = + (Map) res.get("resource_attributes"); + testResource( + resource( + (int) res.get("resource_dropped_attributes_count"), + resAttrs == null + ? null + : attributes(resAttrs.entrySet()))); + assertEquals(LOG_SCHEMA_URL, (String) res.get("log_schema_url")); + @SuppressWarnings("unchecked") + Map scopeAttrs = + (Map) res.get("scope_attributes"); + testScope( + scope( + (String) res.get("scope_name"), + (String) res.get("scope_version"), + (Integer) res.get("scope_dropped_attributes_count"), + scopeAttrs == null + ? null + : attributes(scopeAttrs.entrySet()))); + @SuppressWarnings("unchecked") + Map logAttrs = + (Map) res.get("attributes"); + testLog( + getLog( + (Long) res.get("time_unix_nano"), + (Long) res.get("observed_time_unix_nano"), + (Integer) res.get("severity_number"), + (String) res.get("severity_text"), + anyValue( + (String) res.get("body_type"), + (String) res.get("body_string")), + logAttrs == null + ? null + : attributes(logAttrs.entrySet()), + (Integer) res.get("dropped_attributes_count"), + (Integer) res.get("flags"), + (byte[]) res.get("trace_id"), + (byte[]) res.get("span_id"))); + assertEquals( + l.isValid(), + (Boolean) res.get("is_valid")); + assertEquals( + l.getErrorMessage(), + (String) res.get("error_message")); + } + + private Log defaultLog() { + Log l = new Log(null, null, null); + + l.setFrom(batchTimestamp(), + batchUUID(), + LOG_SEQ_NO, + resourceLogs(), + scopeLogs(), + getLog()); + + return l; + } + + private ResourceLogs resourceLogs() { + ResourceLogs.Builder builder = ResourceLogs + .newBuilder() + .setSchemaUrl(resourceSchemaUrl()) + .setResource(resource()); + + return builder.build(); + } + + private ScopeLogs scopeLogs() { + ScopeLogs.Builder builder = ScopeLogs.newBuilder() + .setSchemaUrl(LOG_SCHEMA_URL); + + builder = builder.setScope(scope()); + + return builder.build(); + } + + private LogRecord getLog( + final Long ts, + final Long observedTs, + final Integer severityNum, + final String severityText, + final AnyValue body, + final Iterable attributes, + final Integer droppedAttrsCount, + final Integer flags, + final byte[] traceId, + final byte[] spanId) { + LogRecord.Builder builder = LogRecord.newBuilder(); + + if (ts != null) { + builder = builder.setTimeUnixNano(ts); + } + + if (observedTs != null) { + builder = builder.setObservedTimeUnixNano(observedTs); + } + + if (severityNum != null) { + builder = builder.setSeverityNumberValue(severityNum); + } + + if (severityText != null) { + builder = builder.setSeverityText(severityText); + } + + if (body != null) { + builder = builder.setBody(body); + } + + if (attributes != null) { + builder = builder.addAllAttributes(attributes); + } + + if (droppedAttrsCount != null) { + builder = builder.setDroppedAttributesCount(droppedAttrsCount); + } + + if (flags != null) { + builder = builder.setFlags(flags); + } + + if (traceId != null) { + builder = builder.setTraceId(ByteString.copyFrom(traceId)); + } + + if (spanId != null) { + builder = builder.setSpanId(ByteString.copyFrom(spanId)); + } + + return builder.build(); + } + + private LogRecord getLog() { + return getLog( + LOG_TIMESTAMP, + LOG_OBSERVED_TIMESTAMP, + LOG_SEVERITY_NUM, + LOG_SEVERITY, + anyValue(LOG_BODY), + logAttributes(), + LOG_DROPPED_ATTRS_COUNT, + LOG_FLAGS, + LOG_TRACE_ID, + LOG_SPAN_ID); + } + + private void testLog(LogRecord actual) { + assertEquals(LOG_TIMESTAMP, actual.getTimeUnixNano()); + assertEquals(LOG_OBSERVED_TIMESTAMP, actual.getObservedTimeUnixNano()); + assertEquals( + SeverityNumber.forNumber(LOG_SEVERITY_NUM), + actual.getSeverityNumber()); + assertEquals(LOG_SEVERITY, actual.getSeverityText()); + testAnyValue(anyValue(LOG_BODY), actual.getBody()); + testAttributes(logAttributes(), actual.getAttributesList()); + assertEquals( + LOG_DROPPED_ATTRS_COUNT, + actual.getDroppedAttributesCount()); + assertEquals(LOG_FLAGS, actual.getFlags()); + assertArrayEquals( + LOG_TRACE_ID, + actual.getTraceId().toByteArray()); + assertArrayEquals( + LOG_SPAN_ID, + actual.getSpanId().toByteArray()); + } + + private Iterable logAttributes() { + return attributes( + Collections.singleton( + Map.entry(LOG_ATTR_KEY, LOG_ATTR_VAL))); + } +} diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java index e00c04e..e443b96 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java @@ -25,8 +25,8 @@ import io.mishmash.opentelemetry.server.collector.Instrumentation; import io.mishmash.opentelemetry.server.collector.Log; import io.mishmash.opentelemetry.server.collector.LogsSubscriber; -import io.mishmash.opentelemetry.persistence.proto.ProtobufLogs; import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufLogs; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java index 2fd435d..bca2709 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java @@ -25,8 +25,8 @@ import io.mishmash.opentelemetry.server.collector.Instrumentation; import io.mishmash.opentelemetry.server.collector.MetricDataPoint; import io.mishmash.opentelemetry.server.collector.MetricsSubscriber; -import io.mishmash.opentelemetry.persistence.proto.ProtobufMetrics; import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufMetrics; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java index dde3702..28e8319 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java @@ -29,8 +29,8 @@ import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -import io.mishmash.opentelemetry.persistence.proto.ProtobufProfiles; import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufProfiles; /** * Subscribes to incoming OpenTelemetry profiles and writes them to parquet diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileSpans.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileSpans.java index 20b59ec..d5ca293 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileSpans.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileSpans.java @@ -25,8 +25,8 @@ import io.mishmash.opentelemetry.server.collector.Instrumentation; import io.mishmash.opentelemetry.server.collector.Span; import io.mishmash.opentelemetry.server.collector.SpansSubscriber; -import io.mishmash.opentelemetry.persistence.proto.ProtobufSpans; import io.mishmash.opentelemetry.persistence.proto.v1.TracesPersistenceProto.PersistedSpan; +import io.mishmash.opentelemetry.persistence.protobuf.ProtobufSpans; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.context.Scope;