diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/Batch.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/Batch.java index b68707b..c5e9414 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/Batch.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/Batch.java @@ -32,18 +32,18 @@ * * Essentially it's used as a way of flow control - an OTLP client submits * a 'batch' of logs, metrics or traces and each individual item might be - * processed by more than one {@link LogsSubscriber}, {@link MetricsSubscriber} - * or {@link SpansSubscriber}. + * processed by more than one {@link LogsSubscriber}, {@link MetricsSubscriber}, + * {@link SpansSubscriber} or {@link ProfilesSubscriber}. * * A response (success or failure) cannot be returned to the client before * the entire OTLP packet is processed by all subscribers, but each subscriber * might work on its own pace. * - * So, a {@link LogsCollector}, a {@link MetricsCollector} or a - * {@link TracesCollector} will create a batch of elements and subscribers, - * load it with data from an OTLP packet and 'delay' the response to the - * client until all the processing - of all elements by all subscribers - - * is done. + * So, a {@link LogsCollector}, a {@link MetricsCollector}, a + * {@link TracesCollector} or a {@link ProfilesCollector} will create a batch + * of elements and subscribers, load it with data from an OTLP packet and + * 'delay' the response to the client until all the processing - of all + * elements by all subscribers - is done. * * @param the type of elements (or work items) of this batch */ diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/SubscribersBatch.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/SubscribersBatch.java index 1f38195..5993ff9 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/SubscribersBatch.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/SubscribersBatch.java @@ -23,9 +23,10 @@ import io.vertx.ext.auth.User; /** - * A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s - * or @{link SpansSubscribers} that were given the task to process - * an OpenTelemetry log record, metric data point or trace span. + * A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s, + * {@link SpansSubscriber}s or {@link ProfilesSubscriber}s that were given + * the task to process an OpenTelemetry log record, metric data point or + * trace span. * * @param the subscriber record type */ diff --git a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/package-info.java b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/package-info.java index 5ef0710..dde76fb 100644 --- a/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/package-info.java +++ b/collector-embedded/src/main/java/io/mishmash/opentelemetry/server/collector/package-info.java @@ -20,8 +20,8 @@ *

* This package contains the main classes used to embed OpenTelemetry * collectors. Use one or all of {@link LogsCollector}, - * {@link MetricsCollector} and {@link TracesCollector} to create - * an OTEL data source for your system. + * {@link MetricsCollector}, {@link TracesCollector} and + * {@link ProfilesCollector} to create an OTEL data source for your system. *

*

* To use them: @@ -31,8 +31,9 @@ * *

  • * Implement one or more subscribers - {@link LogsSubscriber}, - * {@link MetricsSubscriber}, {@link SpansSubscriber} to receive - * incoming OpenTelemetry signals of a given type + * {@link MetricsSubscriber}, {@link SpansSubscriber}, + * {@link ProfilesSubscriber} to receive incoming OpenTelemetry signals + * of a given type *
  • *
  • * Subscribe them to the collector - @@ -47,11 +48,12 @@ * *

    * When OTLP clients connect to the server and submit data - each individual - * data item in them - a log entry, a metric data point or a span - will be - * delivered to the subscribers' {@link LogsSubscriber#onNext(Log)}, - * {@link MetricsSubscriber#onNext(MetricDataPoint)} or - * {@link SpansSubscriber#onNext(Span)} where you can make it available - * within the system you're embedding into. + * data item in them - a log entry, a metric data point, a span or a profile - + * will be delivered to the subscribers' {@link LogsSubscriber#onNext(Log)}, + * {@link MetricsSubscriber#onNext(MetricDataPoint)}, + * {@link SpansSubscriber#onNext(Span)} or + * {@link ProfilesSubscriber#onNext(ProfileSampleValue)} where you can make it + * available within the system you're embedding into. *

    *

    * This package is part of a broader set of OpenTelemetry-related activities diff --git a/persistence-protobuf/pom.xml b/persistence-protobuf/pom.xml new file mode 100644 index 0000000..9e1fc59 --- /dev/null +++ b/persistence-protobuf/pom.xml @@ -0,0 +1,142 @@ + + + + 4.0.0 + + + io.mishmash.opentelemetry + opentelemetry-server-embedded + 1.1.3 + + + persistence-protobuf + jar + + Protobuf persistence format for extracted OpenTelemetry signals + + Utility classes for protobuf serialization of OpenTelemetry signals as + extracted and 'flattened' by the embedded collectors. + + https://mishmash.io/open_source/opentelemetry + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + mishmash io + https://mishmash.io + + + + scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git + scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git + https://github.com/mishmash-io/opentelemetry-server-embedded + + + + + Ivan Kountchev + i.kountchev@mishmash.io + mishmash io + https://mishmash.io + + developer + + + + Andrey Rusev + a.rusev@mishmash.io + www.linkedin.com/in/andrey-rusev-21894172 + mishmash io + https://mishmash.io + + architect + + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + grpc-java + + + + compile + + + + compile + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + + io.mishmash.opentelemetry.persistence.proto.v1 + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + + + + + com.google.protobuf + protobuf-java + + + io.mishmash.opentelemetry + collector-embedded + ${project.version} + + + + diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java new file mode 100644 index 0000000..2b4fa4f --- /dev/null +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufLogs.java @@ -0,0 +1,128 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.proto; + +import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.server.collector.Log; +import io.opentelemetry.proto.common.v1.AnyValue; + +/** + * Utility class to help with protobuf serialization of {@link Log} instances. + */ +public final class ProtobufLogs { + + private ProtobufLogs() { + // constructor is hidden + } + + /** + * Get a protobuf representation of a {@link Log}. + * + * @param log the log signal + * @return a populated {@link PersistedLog.Builder} + */ + public static PersistedLog.Builder buildLog(final Log log) { + PersistedLog.Builder builder = PersistedLog.newBuilder() + .setBatchTimestamp(log.getBatchTimestamp()) + .setBatchUUID(log.getBatchUUID()) + .setSeqNo(log.getSeqNo()) + .setIsValid(log.isValid()); + + if (log.getErrorMessage() != null) { + builder = builder.setErrorMessage(log.getErrorMessage()); + } + + if (log.getResource() != null) { + builder = builder + .addAllResourceAttributes( + log.getResource().getAttributesList()) + .setResourceDroppedAttributesCount( + log.getResource().getDroppedAttributesCount()); + } + + if (log.getResourceSchemaUrl() != null) { + builder = builder + .setResourceSchemaUrl(log.getResourceSchemaUrl()); + } + + if (log.getScope() != null) { + builder = builder + .setScopeName(log.getScope().getName()) + .setScopeVersion(log.getScope().getVersion()) + .addAllScopeAttributes( + log.getScope().getAttributesList()) + .setScopeDroppedAttributesCount( + log.getScope().getDroppedAttributesCount()); + } + + if (log.getLog() != null) { + builder = builder + .setTimeUnixNano(log.getLog().getTimeUnixNano()) + .setObservedTimeUnixNano( + log.getLog().getObservedTimeUnixNano()) + .setSeverityNumber(log.getLog().getSeverityNumber()) + .setSeverityText(log.getLog().getSeverityText()) + .addAllAttributes(log.getLog().getAttributesList()) + .setDroppedAttributesCount( + log.getLog().getDroppedAttributesCount()) + .setFlags(log.getLog().getFlags()) + .setTraceId(log.getLog().getTraceId()) + .setSpanId(log.getLog().getSpanId()); + + AnyValue body = log.getLog().getBody(); + + builder = builder.setBodyType(body.getValueCase().name()); + + switch (body.getValueCase()) { + case ARRAY_VALUE: + builder = builder.setBodyArray(body.getArrayValue()); + break; + case BOOL_VALUE: + builder = builder.setBodyBool(body.getBoolValue()); + break; + case BYTES_VALUE: + builder = builder.setBodyBytes(body.getBytesValue()); + break; + case DOUBLE_VALUE: + builder = builder.setBodyDouble(body.getDoubleValue()); + break; + case INT_VALUE: + builder = builder.setBodyInt(body.getIntValue()); + break; + case KVLIST_VALUE: + builder = builder.setBodyKvlist(body.getKvlistValue()); + break; + case STRING_VALUE: + builder = builder.setBodyString(body.getStringValue()); + break; + case VALUE_NOT_SET: + // FIXME: what to do when not set? + break; + default: + // FIXME: should not ignore + break; + } + } + + if (log.getLogSchemaUrl() != null) { + builder = builder.setLogSchemaUrl(log.getLogSchemaUrl()); + } + + return builder; + } +} diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java new file mode 100644 index 0000000..d077f06 --- /dev/null +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufMetrics.java @@ -0,0 +1,209 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.proto; + +import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; +import io.mishmash.opentelemetry.server.collector.MetricDataPoint; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; + +/** + * Utility class to help with protobuf serialization of + * {@link MetricDataPoint} instances. + */ +public final class ProtobufMetrics { + + private ProtobufMetrics() { + // constructor is hidden + } + + /** + * Get a protobuf representation of a {@link MetricDataPoint}. + * + * @param metric the metric data point + * @return a populated {@link PersistedMetric.Builder} + */ + public static PersistedMetric.Builder buildMetricDataPoint( + final MetricDataPoint metric) { + PersistedMetric.Builder builder = PersistedMetric.newBuilder() + .setBatchTimestamp(metric.getBatchTimestamp()) + .setBatchUUID(metric.getBatchUUID()) + .setSeqNo(metric.getSeqNo()) + .setIsValid(metric.isValid()) + .setDatapointSeqNo(metric.getDatapointSeqNo()); + + if (metric.getErrorMessage() != null) { + builder = builder + .setErrorMessage(metric.getErrorMessage()); + } + + if (metric.getResource() != null) { + builder = builder + .addAllResourceAttributes( + metric.getResource().getAttributesList()) + .setResourceDroppedAttributesCount( + metric.getResource() + .getDroppedAttributesCount()); + } + + if (metric.getResourceSchemaUrl() != null) { + builder = builder + .setResourceSchemaUrl( + metric.getResourceSchemaUrl()); + } + + if (metric.getScope() != null) { + builder = builder + .setScopeName(metric.getScope().getName()) + .setScopeVersion(metric.getScope().getVersion()) + .addAllScopeAttributes( + metric.getScope().getAttributesList()) + .setScopeDroppedAttributesCount( + metric.getScope() + .getDroppedAttributesCount()); + } + + if (metric.getName() != null) { + builder = builder.setName(metric.getName()); + } + + if (metric.getDescription() != null) { + builder = builder.setDescription(metric.getDescription()); + } + + if (metric.getUnit() != null) { + builder = builder.setUnit(metric.getUnit()); + } + + if (metric.getType() != null) { + builder = builder.setType(metric.getType().name()); + } + + if (metric.getMetaData() != null) { + builder = builder + .addAllMetricMetadata(metric.getMetaData()); + } + + switch (metric.getType()) { + case EXPONENTIAL_HISTOGRAM: + ExponentialHistogramDataPoint ehdp = + metric.getExponentialHistogram(); + + builder = builder + .addAllAttributes(ehdp.getAttributesList()) + .setStartTimeUnixNano(ehdp.getStartTimeUnixNano()) + .setTimeUnixNano(ehdp.getTimeUnixNano()) + .addAllExemplars(ehdp.getExemplarsList()) + .setFlags(ehdp.getFlags()) + .setExponentialHistogramCount(ehdp.getCount()) + .setExponentialHistogramSum(ehdp.getSum()) + .setExponentialHistogramScale(ehdp.getScale()) + .setExponentialHistogramZeroCount(ehdp.getZeroCount()) + .setExponentialHistogramPositive(ehdp.getPositive()) + .setExponentialHistogramNegative(ehdp.getNegative()) + .setExponentialHistogramMin(ehdp.getMin()) + .setExponentialHistogramMax(ehdp.getMax()) + .setExponentialHistogramZeroThreshold( + ehdp.getZeroThreshold()) + .setAggregationTemporality( + metric.getAggregationTemporality()); + break; + case GAUGE: + NumberDataPoint gdp = metric.getGauge(); + + builder = builder + .addAllAttributes(gdp.getAttributesList()) + .setStartTimeUnixNano( + gdp.getStartTimeUnixNano()) + .setTimeUnixNano(gdp.getTimeUnixNano()) + .addAllExemplars(gdp.getExemplarsList()) + .setFlags(gdp.getFlags()) + .setGaugeType(gdp.getValueCase().name()) + .setGaugeDouble(gdp.getAsDouble()) + .setGaugeInt(gdp.getAsInt()); + break; + case HISTOGRAM: + HistogramDataPoint hdp = metric.getHistogram(); + + builder = builder + .addAllAttributes(hdp.getAttributesList()) + .setStartTimeUnixNano( + hdp.getStartTimeUnixNano()) + .setTimeUnixNano(hdp.getTimeUnixNano()) + .addAllExemplars(hdp.getExemplarsList()) + .setFlags(hdp.getFlags()) + .setHistogramCount(hdp.getCount()) + .setHistogramSum(hdp.getSum()) + .addAllHistogramBucketCounts( + hdp.getBucketCountsList()) + .addAllHistogramExplicitBounds( + hdp.getExplicitBoundsList()) + .setHistogramMin(hdp.getMin()) + .setHistogramMax(hdp.getMax()) + .setAggregationTemporality( + metric.getAggregationTemporality()); + break; + case SUM: + NumberDataPoint ndp = metric.getSum(); + + builder = builder + .addAllAttributes(ndp.getAttributesList()) + .setStartTimeUnixNano( + ndp.getStartTimeUnixNano()) + .setTimeUnixNano(ndp.getTimeUnixNano()) + .addAllExemplars(ndp.getExemplarsList()) + .setFlags(ndp.getFlags()) + .setSumType(ndp.getValueCase().name()) + .setSumDouble(ndp.getAsDouble()) + .setSumInt(ndp.getAsInt()) + .setAggregationTemporality( + metric.getAggregationTemporality()) + .setIsMonotonic(metric.isMonotonic()); + break; + case SUMMARY: + SummaryDataPoint sdp = metric.getSummary(); + + builder = builder + .addAllAttributes(sdp.getAttributesList()) + .setStartTimeUnixNano( + sdp.getStartTimeUnixNano()) + .setTimeUnixNano(sdp.getTimeUnixNano()) + .setFlags(sdp.getFlags()) + .setSummaryCount(sdp.getCount()) + .setSummarySum(sdp.getSum()) + .addAllSummaryQuantileValues( + sdp.getQuantileValuesList()); + break; + case DATA_NOT_SET: + // FIXME: skip for now + break; + default: + // FIXME: unknown type... + break; + } + + if (metric.getMetricSchemaUrl() != null) { + builder = builder + .setMetricSchemaUrl(metric.getMetricSchemaUrl()); + } + + return builder; + } +} diff --git a/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java new file mode 100644 index 0000000..91b31e6 --- /dev/null +++ b/persistence-protobuf/src/main/java/io/mishmash/opentelemetry/persistence/proto/ProtobufProfiles.java @@ -0,0 +1,444 @@ +/* + * Copyright 2024 Mishmash IO UK Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.mishmash.opentelemetry.persistence.proto; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.KeyValueUnit; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrFunction; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrLabel; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrLine; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrLocation; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrMapping; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.StrValueType; +import io.mishmash.opentelemetry.server.collector.ProfileSampleValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.profiles.v1experimental.AttributeUnit; +import io.opentelemetry.proto.profiles.v1experimental.Function; +import io.opentelemetry.proto.profiles.v1experimental.Label; +import io.opentelemetry.proto.profiles.v1experimental.Line; +import io.opentelemetry.proto.profiles.v1experimental.Link; +import io.opentelemetry.proto.profiles.v1experimental.Location; +import io.opentelemetry.proto.profiles.v1experimental.Mapping; +import io.opentelemetry.proto.profiles.v1experimental.Profile; +import io.opentelemetry.proto.profiles.v1experimental.ProfileContainer; +import io.opentelemetry.proto.profiles.v1experimental.Sample; +import io.opentelemetry.proto.profiles.v1experimental.ValueType; + +/** + * Utility class to help with protobuf serialization of + * {@link ProfileSampleValue} instances. + */ +public final class ProtobufProfiles { + + private ProtobufProfiles() { + // constructor is hidden + } + + /** + * Get a protobuf representation of a + * {@link ProfileSampleValue}. + * + * @param profile the profile sample value + * @return a populated {@link PersistedProfile.Builder} + */ + public static PersistedProfile.Builder buildProfileSampleValue( + final ProfileSampleValue profile) { + PersistedProfile.Builder builder = + PersistedProfile.newBuilder() + .setBatchTimestamp(profile.getBatchTimestamp()) + .setBatchUUID(profile.getBatchUUID()) + .setResourceSeqNo( + profile.getResourceProfileSeqNo()) + .setScopeSeqNo(profile.getScopeProfileSeqNo()) + .setProfileSeqNo(profile.getProfileSeqNo()) + .setSampleSeqNo(profile.getSampleSeqNo()) + .setValueSeqNo(profile.getValueSeqNo()) + .setIsValid(profile.isValid()); + + if (profile.getErrorMessage() != null) { + builder = builder + .setErrorMessage(profile.getErrorMessage()); + } + + if (profile.getResource() != null) { + builder = builder + .addAllResourceAttributes( + profile.getResource().getAttributesList()) + .setResourceDroppedAttributesCount( + profile.getResource() + .getDroppedAttributesCount()); + } + + if (profile.getResourceSchemaUrl() != null) { + builder = builder + .setResourceSchemaUrl( + profile.getResourceSchemaUrl()); + } + + if (profile.getScope() != null) { + builder = builder + .setScopeName(profile.getScope().getName()) + .setScopeVersion(profile.getScope().getVersion()) + .addAllScopeAttributes( + profile.getScope().getAttributesList()) + .setScopeDroppedAttributesCount( + profile.getScope() + .getDroppedAttributesCount()); + } + + if (profile.getProfileSchemaUrl() != null) { + builder = builder + .setProfileSchemaUrl(profile.getProfileSchemaUrl()); + } + + if (profile.getProfileContainer() != null) { + ProfileContainer container = profile.getProfileContainer(); + + builder = builder + .setProfileId(container.getProfileId()) + .setStartTimeUnixNano( + container.getStartTimeUnixNano()) + .setEndTimeUnixNano(container.getEndTimeUnixNano()) + .addAllProfileAttributes( + container.getAttributesList()) + .setProfileDroppedAttributesCount( + container.getDroppedAttributesCount()) + .setOriginalPayloadFormat( + container.getOriginalPayloadFormat()) + .setOriginalPayload( + container.getOriginalPayload()); + } + + Profile p = profile.getProfile(); + if (profile.getProfile() != null) { + builder = builder + .setDropFrames(getStrAt(p, p.getDropFrames())) + .setKeepFrames(getStrAt(p, p.getKeepFrames())) + .setTimeNanos(p.getTimeNanos()) + .setDurationNanos(p.getDurationNanos()) + .setPeriodType(toStr(p, p.getPeriodType())) + .setPeriod(p.getPeriod()) + .addAllComment(toStr(p, p.getCommentList())) + .setDefaultSampleType( + getStrAt(p, p.getDefaultSampleType())); + } + + if (profile.getSample() != null) { + Sample s = profile.getSample(); + + builder = builder + .setStacktraceId( + getStrAt(p, s.getStacktraceIdIndex())) + .addAllLocations(resolveLocations(p, s)) + .addAllLabels(resolveLabels(p, s.getLabelList())) + .addAllAttributes( + resolveAttributes( + p, + s.getAttributesList())) + .addAllTimestampsUnixNano( + s.getTimestampsUnixNanoList()) + .setType( + toStr( + p, + p.getSampleType( + profile.getValueSeqNo()))); + + Link l = p.getLinkTableCount() == 0 + ? null + : p.getLinkTable((int) s.getLink()); + if (l != null) { + builder = builder + .setTraceId(l.getTraceId()) + .setSpanId(l.getSpanId()); + } + } + + builder = builder.setValue(profile.getValue()); + + return builder; + } + + /** + * Return the string contained in this profile at a given index. + * + * @param profile the OTLP profile + * @param index the index + * @return the string at that index + */ + public static String getStrAt(final Profile profile, final long index) { + return profile.getStringTable((int) index); + } + + /** + * Convert a ValueType to StrValueType, resolving all strings. + * + * @param p an OTLP Profile + * @param vt an OTLP ValueType + * @return a converted value type + */ + public static StrValueType.Builder toStr( + final Profile p, + final ValueType vt) { + return StrValueType.newBuilder() + .setType(getStrAt(p, vt.getType())) + .setUnit(getStrAt(p, vt.getUnit())) + .setAggregationTemporality(vt.getAggregationTemporality()); + } + + /** + * Resolve an OTLP Mapping. + * + * @param p the OTLP Profile, used for resolution + * @param mapping the OTLP Mapping + * @return a resolved mapping + */ + public static StrMapping.Builder resolve( + final Profile p, + final Mapping mapping) { + return StrMapping.newBuilder() + .setMemoryStart(mapping.getMemoryStart()) + .setMemoryLimit(mapping.getMemoryLimit()) + .setFileOffset(mapping.getFileOffset()) + .setFilename(getStrAt(p, mapping.getFilename())) + .setBuildId(getStrAt(p, mapping.getBuildId())) + .setBuildIdKind(mapping.getBuildIdKind()) + .addAllAttributes( + resolveAttributes(p, mapping.getAttributesList())) + .setHasFunctions(mapping.getHasFunctions()) + .setHasFilenames(mapping.getHasFilenames()) + .setHasLineNumbers(mapping.getHasLineNumbers()) + .setHasInlineFrames(mapping.getHasInlineFrames()); + } + + /** + * Resolve an OTLP Function. + * + * @param p the OTLP Profile, used for resolution + * @param f the OTLP Function + * @return a function with resolved strings and values + */ + public static StrFunction.Builder resolve( + final Profile p, + final Function f) { + return StrFunction.newBuilder() + .setName(getStrAt(p, f.getName())) + .setSystemName(getStrAt(p, f.getSystemName())) + .setFilename(getStrAt(p, f.getFilename())) + .setStartLine(f.getStartLine()); + } + + /** + * Resolve an OTLP Line. + * + * @param p the OTLP Profile, used for resolution + * @param l the OTLP Line + * @return a line with resolved strings and values + */ + public static StrLine.Builder resolve( + final Profile p, + final Line l) { + return StrLine.newBuilder() + .setFunction( + resolve( + p, + p.getFunction((int) l.getFunctionIndex()))) + .setLine(l.getLine()) + .setColumn(l.getColumn()); + } + + /** + * Convert an OTLP Location by resolving all indexes. + * + * @param p the OTLP Profile, used for resolution + * @param location the OTLP Location + * @return a resolved location + */ + public static StrLocation.Builder resolve( + final Profile p, + final Location location) { + return StrLocation.newBuilder() + .setMapping( + resolve( + p, + p.getMapping((int) location.getMappingIndex()))) + .setAddress(location.getAddress()) + .addAllLines(resolveLines(p, location.getLineList())) + .setIsFolded(location.getIsFolded()) + .setType(getStrAt(p, location.getTypeIndex())) + .addAllAttributes( + resolveAttributes(p, location.getAttributesList())); + } + + /** + * Resolve an OTLP Label. + * + * @param p the OTLP Profile used for resolution + * @param label the OTLP Label + * @return a resolved label + */ + public static StrLabel.Builder resolve( + final Profile p, + final Label label) { + StrLabel.Builder builder = StrLabel.newBuilder() + .setKey(getStrAt(p, label.getKey())); + + if (label.getNum() > 0) { + builder = builder + .setNum(label.getNum()) + .setNumUnit(getStrAt(p, label.getNumUnit())); + } else { + builder = builder + .setStr(getStrAt(p, label.getStr())); + } + + return builder; + } + + /** + * Resolve an OTLP Sample attribute, adding an attribute unit if found. + * + * @param p the OTLP Profile used for lookup + * @param attr the Sample attribute + * @return an attribute with a unit (if present) + */ + public static KeyValueUnit.Builder resolve( + final Profile p, + final KeyValue attr) { + String key = attr.getKey(); + KeyValueUnit.Builder builder = KeyValueUnit.newBuilder() + .setKey(key) + .setValue(attr.getValue()); + + for (AttributeUnit u : p.getAttributeUnitsList()) { + String unit = getStrAt(p, u.getAttributeKey()); + + if (key.equals(unit)) { + builder = builder + .setUnit(unit); + + break; + } + } + + return builder; + } + + /** + * Get all locations contained in an OTLP Sample and convert them. + * + * @param p the OTLP Profile, used for resolution + * @param s the OTLP Sample + * @return a list of all contained locations, resolved + */ + public static List resolveLocations( + final Profile p, + final Sample s) { + if (s.getLocationIndexCount() > 0) { + return s.getLocationIndexList().stream() + .map(l -> p.getLocation(l.intValue())) + .map(l -> resolve(p, l)) + .map(l -> l.build()) + .toList(); + } else if (s.getLocationsLength() > 0) { + ArrayList res = + new ArrayList<>((int) s.getLocationsLength()); + + int startIdx = (int) s.getLocationsStartIndex(); + for (int idx = startIdx; + idx < startIdx + (int) s.getLocationsLength(); + idx++) { + res.add( + resolve(p, p.getLocation(idx)) + .build()); + } + + return res; + } else { + // couldn't extract locations? + return Collections.emptyList(); + } + } + + /** + * Resolve a list of OTLP Labels. + * + * @param p the OTLP Profile to use for resolution + * @param labels the list of OTLP Labels + * @return a list of resolved labels + */ + public static List resolveLabels( + final Profile p, + final List

    Flattened protobuf format for OpenTelemetry signals

    + *

    + * This package contains the classes used by the embedded OpenTelemetry + * servers to persist 'flattened' telemetry records. Find out more about the + * embedded servers (and how they 'flatten' OpenTelemetry data) + * on the GitHub link below. + *

    + *

    + * This package is part of a broader set of OpenTelemetry-related activities + * at mishmash io. To find out more about this and other packages visit the + * links below. + *

    + * @see + * OpenTelemetry at mishmash io + * @see + * This package on GitHub + * @see mishmash io + */ +package io.mishmash.opentelemetry.persistence.proto; diff --git a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/logs_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto similarity index 95% rename from server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/logs_persistence.proto rename to persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto index 216a714..862c6ec 100644 --- a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/logs_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/logs_persistence.proto @@ -16,12 +16,12 @@ syntax = "proto3"; -package io.mishmash.opentelemetry.server.parquet.persistence.proto.v1; +package io.mishmash.opentelemetry.persistence.proto.v1; import "opentelemetry/proto/common/v1/common.proto"; import "opentelemetry/proto/logs/v1/logs.proto"; -option java_package = "io.mishmash.opentelemetry.server.parquet.persistence.proto.v1"; +option java_package = "io.mishmash.opentelemetry.persistence.proto.v1"; option java_outer_classname = "LogsPersistenceProto"; message PersistedLog { diff --git a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/metrics_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto similarity index 97% rename from server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/metrics_persistence.proto rename to persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto index bf0594b..2bbe8b1 100644 --- a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/metrics_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/metrics_persistence.proto @@ -16,12 +16,12 @@ syntax = "proto3"; -package io.mishmash.opentelemetry.server.parquet.persistence.proto.v1; +package io.mishmash.opentelemetry.persistence.proto.v1; import "opentelemetry/proto/common/v1/common.proto"; import "opentelemetry/proto/metrics/v1/metrics.proto"; -option java_package = "io.mishmash.opentelemetry.server.parquet.persistence.proto.v1"; +option java_package = "io.mishmash.opentelemetry.persistence.proto.v1"; option java_outer_classname = "MetricsPersistenceProto"; message PersistedMetric { diff --git a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/profiles_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto similarity index 97% rename from server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/profiles_persistence.proto rename to persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto index cec62c4..dd54b61 100644 --- a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/profiles_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/profiles_persistence.proto @@ -16,13 +16,13 @@ syntax = "proto3"; -package io.mishmash.opentelemetry.server.parquet.persistence.proto.v1; +package io.mishmash.opentelemetry.persistence.proto.v1; import "opentelemetry/proto/common/v1/common.proto"; import "opentelemetry/proto/profiles/v1experimental/profiles.proto"; import "opentelemetry/proto/profiles/v1experimental/pprofextended.proto"; -option java_package = "io.mishmash.opentelemetry.server.parquet.persistence.proto.v1"; +option java_package = "io.mishmash.opentelemetry.persistence.proto.v1"; option java_outer_classname = "ProfilesPersistenceProto"; message PersistedProfile { diff --git a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/traces_persistence.proto b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto similarity index 95% rename from server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/traces_persistence.proto rename to persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto index 0d8c4e1..a330d13 100644 --- a/server-parquet/src/main/proto/io/mishmash/opentelemetry/server/persistence/proto/v1/traces_persistence.proto +++ b/persistence-protobuf/src/main/proto/io/mishmash/opentelemetry/persistence/proto/v1/traces_persistence.proto @@ -16,12 +16,12 @@ syntax = "proto3"; -package io.mishmash.opentelemetry.server.parquet.persistence.proto.v1; +package io.mishmash.opentelemetry.persistence.proto.v1; import "opentelemetry/proto/common/v1/common.proto"; import "opentelemetry/proto/trace/v1/trace.proto"; -option java_package = "io.mishmash.opentelemetry.server.parquet.persistence.proto.v1"; +option java_package = "io.mishmash.opentelemetry.persistence.proto.v1"; option java_outer_classname = "TracesPersistenceProto"; message PersistedSpan { diff --git a/pom.xml b/pom.xml index 7932647..8a50716 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ collector-embedded + persistence-protobuf server-parquet diff --git a/server-parquet/pom.xml b/server-parquet/pom.xml index 421d00a..3aee2c2 100644 --- a/server-parquet/pom.xml +++ b/server-parquet/pom.xml @@ -87,28 +87,6 @@ - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - - - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} - - grpc-java - - - - compile - - - - compile - - - - - org.apache.maven.plugins maven-compiler-plugin @@ -120,9 +98,6 @@ org.apache.maven.plugins maven-javadoc-plugin - - io.mishmash.opentelemetry.server.parquet.persistence.proto.v1 - org.apache.maven.plugins @@ -141,6 +116,11 @@ collector-embedded ${project.version} + + io.mishmash.opentelemetry + persistence-protobuf + ${project.version} + org.apache.parquet parquet-protobuf diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java index a636661..e00c04e 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileLogs.java @@ -25,12 +25,12 @@ import io.mishmash.opentelemetry.server.collector.Instrumentation; import io.mishmash.opentelemetry.server.collector.Log; import io.mishmash.opentelemetry.server.collector.LogsSubscriber; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.LogsPersistenceProto.PersistedLog; +import io.mishmash.opentelemetry.persistence.proto.ProtobufLogs; +import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -import io.opentelemetry.proto.common.v1.AnyValue; /** * Subscribes to incoming OpenTelemetry logs and writes them to parquet @@ -191,94 +191,10 @@ public void onNext(final Log log) { try { span.addEvent("Build persisted log"); - PersistedLog.Builder builder = PersistedLog.newBuilder() - .setBatchTimestamp(log.getBatchTimestamp()) - .setBatchUUID(log.getBatchUUID()) - .setSeqNo(log.getSeqNo()) - .setIsValid(log.isValid()); - - if (log.getErrorMessage() != null) { - builder = builder.setErrorMessage(log.getErrorMessage()); - } - - if (log.getResource() != null) { - builder = builder - .addAllResourceAttributes( - log.getResource().getAttributesList()) - .setResourceDroppedAttributesCount( - log.getResource().getDroppedAttributesCount()); - } - - if (log.getResourceSchemaUrl() != null) { - builder = builder - .setResourceSchemaUrl(log.getResourceSchemaUrl()); - } - - if (log.getScope() != null) { - builder = builder - .setScopeName(log.getScope().getName()) - .setScopeVersion(log.getScope().getVersion()) - .addAllScopeAttributes( - log.getScope().getAttributesList()) - .setScopeDroppedAttributesCount( - log.getScope().getDroppedAttributesCount()); - } - - if (log.getLog() != null) { - builder = builder - .setTimeUnixNano(log.getLog().getTimeUnixNano()) - .setObservedTimeUnixNano( - log.getLog().getObservedTimeUnixNano()) - .setSeverityNumber(log.getLog().getSeverityNumber()) - .setSeverityText(log.getLog().getSeverityText()) - .addAllAttributes(log.getLog().getAttributesList()) - .setDroppedAttributesCount( - log.getLog().getDroppedAttributesCount()) - .setFlags(log.getLog().getFlags()) - .setTraceId(log.getLog().getTraceId()) - .setSpanId(log.getLog().getSpanId()); - - AnyValue body = log.getLog().getBody(); - - builder = builder.setBodyType(body.getValueCase().name()); - - switch (body.getValueCase()) { - case ARRAY_VALUE: - builder = builder.setBodyArray(body.getArrayValue()); - break; - case BOOL_VALUE: - builder = builder.setBodyBool(body.getBoolValue()); - break; - case BYTES_VALUE: - builder = builder.setBodyBytes(body.getBytesValue()); - break; - case DOUBLE_VALUE: - builder = builder.setBodyDouble(body.getDoubleValue()); - break; - case INT_VALUE: - builder = builder.setBodyInt(body.getIntValue()); - break; - case KVLIST_VALUE: - builder = builder.setBodyKvlist(body.getKvlistValue()); - break; - case STRING_VALUE: - builder = builder.setBodyString(body.getStringValue()); - break; - case VALUE_NOT_SET: - // FIXME: what to do when not set? - break; - default: - // FIXME: should not ignore - break; - } - } - - if (log.getLogSchemaUrl() != null) { - builder = builder.setLogSchemaUrl(log.getLogSchemaUrl()); - } - span.addEvent("Write persisted log to file"); - parquet.write(builder.build()); + parquet.write(ProtobufLogs + .buildLog(log) + .build()); numWritten.add(1); diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java index b06bbf3..2fd435d 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileMetrics.java @@ -25,15 +25,12 @@ import io.mishmash.opentelemetry.server.collector.Instrumentation; import io.mishmash.opentelemetry.server.collector.MetricDataPoint; import io.mishmash.opentelemetry.server.collector.MetricsSubscriber; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; +import io.mishmash.opentelemetry.persistence.proto.ProtobufMetrics; +import io.mishmash.opentelemetry.persistence.proto.v1.MetricsPersistenceProto.PersistedMetric; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; -import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; -import io.opentelemetry.proto.metrics.v1.NumberDataPoint; -import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; /** * Subscribes to incoming OpenTelemetry metrics and writes them to parquet @@ -200,170 +197,10 @@ public void onNext(final MetricDataPoint metric) { try { span.addEvent("Build persisted metric"); - PersistedMetric.Builder builder = PersistedMetric.newBuilder() - .setBatchTimestamp(metric.getBatchTimestamp()) - .setBatchUUID(metric.getBatchUUID()) - .setSeqNo(metric.getSeqNo()) - .setIsValid(metric.isValid()) - .setDatapointSeqNo(metric.getDatapointSeqNo()); - - if (metric.getErrorMessage() != null) { - builder = builder - .setErrorMessage(metric.getErrorMessage()); - } - - if (metric.getResource() != null) { - builder = builder - .addAllResourceAttributes( - metric.getResource().getAttributesList()) - .setResourceDroppedAttributesCount( - metric.getResource() - .getDroppedAttributesCount()); - } - - if (metric.getResourceSchemaUrl() != null) { - builder = builder - .setResourceSchemaUrl( - metric.getResourceSchemaUrl()); - } - - if (metric.getScope() != null) { - builder = builder - .setScopeName(metric.getScope().getName()) - .setScopeVersion(metric.getScope().getVersion()) - .addAllScopeAttributes( - metric.getScope().getAttributesList()) - .setScopeDroppedAttributesCount( - metric.getScope() - .getDroppedAttributesCount()); - } - - if (metric.getName() != null) { - builder = builder.setName(metric.getName()); - } - - if (metric.getDescription() != null) { - builder = builder.setDescription(metric.getDescription()); - } - - if (metric.getUnit() != null) { - builder = builder.setUnit(metric.getUnit()); - } - - if (metric.getType() != null) { - builder = builder.setType(metric.getType().name()); - } - - if (metric.getMetaData() != null) { - builder = builder - .addAllMetricMetadata(metric.getMetaData()); - } - - switch (metric.getType()) { - case EXPONENTIAL_HISTOGRAM: - ExponentialHistogramDataPoint ehdp = - metric.getExponentialHistogram(); - - builder = builder - .addAllAttributes(ehdp.getAttributesList()) - .setStartTimeUnixNano(ehdp.getStartTimeUnixNano()) - .setTimeUnixNano(ehdp.getTimeUnixNano()) - .addAllExemplars(ehdp.getExemplarsList()) - .setFlags(ehdp.getFlags()) - .setExponentialHistogramCount(ehdp.getCount()) - .setExponentialHistogramSum(ehdp.getSum()) - .setExponentialHistogramScale(ehdp.getScale()) - .setExponentialHistogramZeroCount(ehdp.getZeroCount()) - .setExponentialHistogramPositive(ehdp.getPositive()) - .setExponentialHistogramNegative(ehdp.getNegative()) - .setExponentialHistogramMin(ehdp.getMin()) - .setExponentialHistogramMax(ehdp.getMax()) - .setExponentialHistogramZeroThreshold( - ehdp.getZeroThreshold()) - .setAggregationTemporality( - metric.getAggregationTemporality()); - break; - case GAUGE: - NumberDataPoint gdp = metric.getGauge(); - - builder = builder - .addAllAttributes(gdp.getAttributesList()) - .setStartTimeUnixNano( - gdp.getStartTimeUnixNano()) - .setTimeUnixNano(gdp.getTimeUnixNano()) - .addAllExemplars(gdp.getExemplarsList()) - .setFlags(gdp.getFlags()) - .setGaugeType(gdp.getValueCase().name()) - .setGaugeDouble(gdp.getAsDouble()) - .setGaugeInt(gdp.getAsInt()); - break; - case HISTOGRAM: - HistogramDataPoint hdp = metric.getHistogram(); - - builder = builder - .addAllAttributes(hdp.getAttributesList()) - .setStartTimeUnixNano( - hdp.getStartTimeUnixNano()) - .setTimeUnixNano(hdp.getTimeUnixNano()) - .addAllExemplars(hdp.getExemplarsList()) - .setFlags(hdp.getFlags()) - .setHistogramCount(hdp.getCount()) - .setHistogramSum(hdp.getSum()) - .addAllHistogramBucketCounts( - hdp.getBucketCountsList()) - .addAllHistogramExplicitBounds( - hdp.getExplicitBoundsList()) - .setHistogramMin(hdp.getMin()) - .setHistogramMax(hdp.getMax()) - .setAggregationTemporality( - metric.getAggregationTemporality()); - break; - case SUM: - NumberDataPoint ndp = metric.getSum(); - - builder = builder - .addAllAttributes(ndp.getAttributesList()) - .setStartTimeUnixNano( - ndp.getStartTimeUnixNano()) - .setTimeUnixNano(ndp.getTimeUnixNano()) - .addAllExemplars(ndp.getExemplarsList()) - .setFlags(ndp.getFlags()) - .setSumType(ndp.getValueCase().name()) - .setSumDouble(ndp.getAsDouble()) - .setSumInt(ndp.getAsInt()) - .setAggregationTemporality( - metric.getAggregationTemporality()) - .setIsMonotonic(metric.isMonotonic()); - break; - case SUMMARY: - SummaryDataPoint sdp = metric.getSummary(); - - builder = builder - .addAllAttributes(sdp.getAttributesList()) - .setStartTimeUnixNano( - sdp.getStartTimeUnixNano()) - .setTimeUnixNano(sdp.getTimeUnixNano()) - .setFlags(sdp.getFlags()) - .setSummaryCount(sdp.getCount()) - .setSummarySum(sdp.getSum()) - .addAllSummaryQuantileValues( - sdp.getQuantileValuesList()); - break; - case DATA_NOT_SET: - // FIXME: skip for now - break; - default: - // FIXME: unknown type... - break; - } - - if (metric.getMetricSchemaUrl() != null) { - builder = builder - .setMetricSchemaUrl(metric.getMetricSchemaUrl()); - } - span.addEvent("Write persisted metric to file"); - parquet.write(builder.build()); + parquet.write(ProtobufMetrics + .buildMetricDataPoint(metric) + .build()); numWritten.add(1); diff --git a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java index 1e8591c..dde3702 100644 --- a/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java +++ b/server-parquet/src/main/java/io/mishmash/opentelemetry/server/parquet/FileProfiles.java @@ -18,9 +18,6 @@ package io.mishmash.opentelemetry.server.parquet; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.concurrent.Flow.Subscription; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,26 +29,8 @@ import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -import io.opentelemetry.proto.common.v1.KeyValue; -import io.opentelemetry.proto.profiles.v1experimental.AttributeUnit; -import io.opentelemetry.proto.profiles.v1experimental.Function; -import io.opentelemetry.proto.profiles.v1experimental.Label; -import io.opentelemetry.proto.profiles.v1experimental.Line; -import io.opentelemetry.proto.profiles.v1experimental.Link; -import io.opentelemetry.proto.profiles.v1experimental.Location; -import io.opentelemetry.proto.profiles.v1experimental.Mapping; -import io.opentelemetry.proto.profiles.v1experimental.Profile; -import io.opentelemetry.proto.profiles.v1experimental.ProfileContainer; -import io.opentelemetry.proto.profiles.v1experimental.Sample; -import io.opentelemetry.proto.profiles.v1experimental.ValueType; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.KeyValueUnit; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrFunction; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrLabel; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrLine; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrLocation; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrMapping; -import io.mishmash.opentelemetry.server.parquet.persistence.proto.v1.ProfilesPersistenceProto.StrValueType; +import io.mishmash.opentelemetry.persistence.proto.ProtobufProfiles; +import io.mishmash.opentelemetry.persistence.proto.v1.ProfilesPersistenceProto.PersistedProfile; /** * Subscribes to incoming OpenTelemetry profiles and writes them to parquet @@ -219,120 +198,10 @@ public void onNext(final ProfileSampleValue profile) { try { span.addEvent("Build persisted profile"); - PersistedProfile.Builder builder = - PersistedProfile.newBuilder() - .setBatchTimestamp(profile.getBatchTimestamp()) - .setBatchUUID(profile.getBatchUUID()) - .setResourceSeqNo( - profile.getResourceProfileSeqNo()) - .setScopeSeqNo(profile.getScopeProfileSeqNo()) - .setProfileSeqNo(profile.getProfileSeqNo()) - .setSampleSeqNo(profile.getSampleSeqNo()) - .setValueSeqNo(profile.getValueSeqNo()) - .setIsValid(profile.isValid()); - - if (profile.getErrorMessage() != null) { - builder = builder - .setErrorMessage(profile.getErrorMessage()); - } - - if (profile.getResource() != null) { - builder = builder - .addAllResourceAttributes( - profile.getResource().getAttributesList()) - .setResourceDroppedAttributesCount( - profile.getResource() - .getDroppedAttributesCount()); - } - - if (profile.getResourceSchemaUrl() != null) { - builder = builder - .setResourceSchemaUrl( - profile.getResourceSchemaUrl()); - } - - if (profile.getScope() != null) { - builder = builder - .setScopeName(profile.getScope().getName()) - .setScopeVersion(profile.getScope().getVersion()) - .addAllScopeAttributes( - profile.getScope().getAttributesList()) - .setScopeDroppedAttributesCount( - profile.getScope() - .getDroppedAttributesCount()); - } - - if (profile.getProfileSchemaUrl() != null) { - builder = builder - .setProfileSchemaUrl(profile.getProfileSchemaUrl()); - } - - if (profile.getProfileContainer() != null) { - ProfileContainer container = profile.getProfileContainer(); - - builder = builder - .setProfileId(container.getProfileId()) - .setStartTimeUnixNano( - container.getStartTimeUnixNano()) - .setEndTimeUnixNano(container.getEndTimeUnixNano()) - .addAllProfileAttributes( - container.getAttributesList()) - .setProfileDroppedAttributesCount( - container.getDroppedAttributesCount()) - .setOriginalPayloadFormat( - container.getOriginalPayloadFormat()) - .setOriginalPayload( - container.getOriginalPayload()); - } - - Profile p = profile.getProfile(); - if (profile.getProfile() != null) { - builder = builder - .setDropFrames(getStrAt(p, p.getDropFrames())) - .setKeepFrames(getStrAt(p, p.getKeepFrames())) - .setTimeNanos(p.getTimeNanos()) - .setDurationNanos(p.getDurationNanos()) - .setPeriodType(toStr(p, p.getPeriodType())) - .setPeriod(p.getPeriod()) - .addAllComment(toStr(p, p.getCommentList())) - .setDefaultSampleType( - getStrAt(p, p.getDefaultSampleType())); - } - - if (profile.getSample() != null) { - Sample s = profile.getSample(); - - builder = builder - .setStacktraceId( - getStrAt(p, s.getStacktraceIdIndex())) - .addAllLocations(resolveLocations(p, s)) - .addAllLabels(resolveLabels(p, s.getLabelList())) - .addAllAttributes( - resolveAttributes( - p, - s.getAttributesList())) - .addAllTimestampsUnixNano( - s.getTimestampsUnixNanoList()) - .setType( - toStr( - p, - p.getSampleType( - profile.getValueSeqNo()))); - - Link l = p.getLinkTableCount() == 0 - ? null - : p.getLinkTable((int) s.getLink()); - if (l != null) { - builder = builder - .setTraceId(l.getTraceId()) - .setSpanId(l.getSpanId()); - } - } - - builder = builder.setValue(profile.getValue()); - span.addEvent("Write persisted profile to file"); - parquet.write(builder.build()); + parquet.write(ProtobufProfiles + .buildProfileSampleValue(profile) + .build()); numWritten.add(1); @@ -357,265 +226,4 @@ public void onSubscribe(final Subscription newSubscription) { subscription.request(Long.MAX_VALUE); } - - /** - * Return the string contained in this profile at a given index. - * - * @param profile the OTLP profile - * @param index the index - * @return the string at that index - */ - private String getStrAt(final Profile profile, final long index) { - return profile.getStringTable((int) index); - } - - /** - * Convert a ValueType to StrValueType, resolving all strings. - * - * @param p an OTLP Profile - * @param vt an OTLP ValueType - * @return a converted value type - */ - private StrValueType.Builder toStr(final Profile p, final ValueType vt) { - return StrValueType.newBuilder() - .setType(getStrAt(p, vt.getType())) - .setUnit(getStrAt(p, vt.getUnit())) - .setAggregationTemporality(vt.getAggregationTemporality()); - } - - /** - * Resolve an OTLP Mapping. - * - * @param p the OTLP Profile, used for resolution - * @param mapping the OTLP Mapping - * @return a resolved mapping - */ - private StrMapping.Builder resolve( - final Profile p, - final Mapping mapping) { - return StrMapping.newBuilder() - .setMemoryStart(mapping.getMemoryStart()) - .setMemoryLimit(mapping.getMemoryLimit()) - .setFileOffset(mapping.getFileOffset()) - .setFilename(getStrAt(p, mapping.getFilename())) - .setBuildId(getStrAt(p, mapping.getBuildId())) - .setBuildIdKind(mapping.getBuildIdKind()) - .addAllAttributes( - resolveAttributes(p, mapping.getAttributesList())) - .setHasFunctions(mapping.getHasFunctions()) - .setHasFilenames(mapping.getHasFilenames()) - .setHasLineNumbers(mapping.getHasLineNumbers()) - .setHasInlineFrames(mapping.getHasInlineFrames()); - } - - /** - * Resolve an OTLP Function. - * - * @param p the OTLP Profile, used for resolution - * @param f the OTLP Function - * @return a function with resolved strings and values - */ - private StrFunction.Builder resolve( - final Profile p, - final Function f) { - return StrFunction.newBuilder() - .setName(getStrAt(p, f.getName())) - .setSystemName(getStrAt(p, f.getSystemName())) - .setFilename(getStrAt(p, f.getFilename())) - .setStartLine(f.getStartLine()); - } - - /** - * Resolve an OTLP Line. - * - * @param p the OTLP Profile, used for resolution - * @param l the OTLP Line - * @return a line with resolved strings and values - */ - private StrLine.Builder resolve( - final Profile p, - final Line l) { - return StrLine.newBuilder() - .setFunction( - resolve( - p, - p.getFunction((int) l.getFunctionIndex()))) - .setLine(l.getLine()) - .setColumn(l.getColumn()); - } - - /** - * Convert an OTLP Location by resolving all indexes. - * - * @param p the OTLP Profile, used for resolution - * @param location the OTLP Location - * @return a resolved location - */ - private StrLocation.Builder resolve( - final Profile p, - final Location location) { - return StrLocation.newBuilder() - .setMapping( - resolve( - p, - p.getMapping((int) location.getMappingIndex()))) - .setAddress(location.getAddress()) - .addAllLines(resolveLines(p, location.getLineList())) - .setIsFolded(location.getIsFolded()) - .setType(getStrAt(p, location.getTypeIndex())) - .addAllAttributes( - resolveAttributes(p, location.getAttributesList())); - } - - /** - * Resolve an OTLP Label. - * - * @param p the OTLP Profile used for resolution - * @param label the OTLP Label - * @return a resolved label - */ - private StrLabel.Builder resolve( - final Profile p, - final Label label) { - StrLabel.Builder builder = StrLabel.newBuilder() - .setKey(getStrAt(p, label.getKey())); - - if (label.getNum() > 0) { - builder = builder - .setNum(label.getNum()) - .setNumUnit(getStrAt(p, label.getNumUnit())); - } else { - builder = builder - .setStr(getStrAt(p, label.getStr())); - } - - return builder; - } - - /** - * Resolve an OTLP Sample attribute, adding an attribute unit if found. - * - * @param p the OTLP Profile used for lookup - * @param attr the Sample attribute - * @return an attribute with a unit (if present) - */ - private KeyValueUnit.Builder resolve( - final Profile p, - final KeyValue attr) { - String key = attr.getKey(); - KeyValueUnit.Builder builder = KeyValueUnit.newBuilder() - .setKey(key) - .setValue(attr.getValue()); - - for (AttributeUnit u : p.getAttributeUnitsList()) { - String unit = getStrAt(p, u.getAttributeKey()); - - if (key.equals(unit)) { - builder = builder - .setUnit(unit); - - break; - } - } - - return builder; - } - - /** - * Get all locations contained in an OTLP Sample and convert them. - * - * @param p the OTLP Profile, used for resolution - * @param s the OTLP Sample - * @return a list of all contained locations, resolved - */ - private List resolveLocations( - final Profile p, - final Sample s) { - if (s.getLocationIndexCount() > 0) { - return s.getLocationIndexList().stream() - .map(l -> p.getLocation(l.intValue())) - .map(l -> resolve(p, l)) - .map(l -> l.build()) - .toList(); - } else if (s.getLocationsLength() > 0) { - ArrayList res = - new ArrayList<>((int) s.getLocationsLength()); - - int startIdx = (int) s.getLocationsStartIndex(); - for (int idx = startIdx; - idx < startIdx + (int) s.getLocationsLength(); - idx++) { - res.add( - resolve(p, p.getLocation(idx)) - .build()); - } - - return res; - } else { - // couldn't extract locations? - return Collections.emptyList(); - } - } - - /** - * Resolve a list of OTLP Labels. - * - * @param p the OTLP Profile to use for resolution - * @param labels the list of OTLP Labels - * @return a list of resolved labels - */ - private List resolveLabels( - final Profile p, - final List