Skip to content

Commit

Permalink
Separate protobuf encoding from the parquet server
Browse files Browse the repository at this point in the history
  • Loading branch information
arusevm committed Sep 28, 2024
1 parent 001c9ce commit b91a6fc
Show file tree
Hide file tree
Showing 19 changed files with 1,124 additions and 774 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@
*
* Essentially it's used as a way of flow control - an OTLP client submits
* a 'batch' of logs, metrics or traces and each individual item might be
* processed by more than one {@link LogsSubscriber}, {@link MetricsSubscriber}
* or {@link SpansSubscriber}.
* processed by more than one {@link LogsSubscriber}, {@link MetricsSubscriber},
* {@link SpansSubscriber} or {@link ProfilesSubscriber}.
*
* A response (success or failure) cannot be returned to the client before
* the entire OTLP packet is processed by all subscribers, but each subscriber
* might work on its own pace.
*
* So, a {@link LogsCollector}, a {@link MetricsCollector} or a
* {@link TracesCollector} will create a batch of elements and subscribers,
* load it with data from an OTLP packet and 'delay' the response to the
* client until all the processing - of all elements by all subscribers -
* is done.
* So, a {@link LogsCollector}, a {@link MetricsCollector}, a
* {@link TracesCollector} or a {@link ProfilesCollector} will create a batch
* of elements and subscribers, load it with data from an OTLP packet and
* 'delay' the response to the client until all the processing - of all
* elements by all subscribers - is done.
*
* @param <T> the type of elements (or work items) of this batch
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import io.opentelemetry.context.Context;

/**
* A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s
* or @{link SpansSubscribers} that were given the task to process
* an OpenTelemetry log record, metric data point or trace span.
* A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s,
* {@link SpansSubscriber}s or {@link ProfilesSubscriber}s that were given
* the task to process an OpenTelemetry log record, metric data point or
* trace span.
*
* @param <T> the subscriber record type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
* <p>
* This package contains the main classes used to embed OpenTelemetry
* collectors. Use one or all of {@link LogsCollector},
* {@link MetricsCollector} and {@link TracesCollector} to create
* an OTEL data source for your system.
* {@link MetricsCollector}, {@link TracesCollector} and
* {@link ProfilesCollector} to create an OTEL data source for your system.
* </p>
* <p>
* To use them:
Expand All @@ -31,8 +31,9 @@
* </li>
* <li>
* Implement one or more subscribers - {@link LogsSubscriber},
* {@link MetricsSubscriber}, {@link SpansSubscriber} to receive
* incoming OpenTelemetry signals of a given type
* {@link MetricsSubscriber}, {@link SpansSubscriber},
* {@link ProfilesSubscriber} to receive incoming OpenTelemetry signals
* of a given type
* </li>
* <li>
* Subscribe them to the collector -
Expand All @@ -47,11 +48,12 @@
*
* <p>
* When OTLP clients connect to the server and submit data - each individual
* data item in them - a log entry, a metric data point or a span - will be
* delivered to the subscribers' {@link LogsSubscriber#onNext(Log)},
* {@link MetricsSubscriber#onNext(MetricDataPoint)} or
* {@link SpansSubscriber#onNext(Span)} where you can make it available
* within the system you're embedding into.
* data item in them - a log entry, a metric data point, a span or a profile -
* will be delivered to the subscribers' {@link LogsSubscriber#onNext(Log)},
* {@link MetricsSubscriber#onNext(MetricDataPoint)},
* {@link SpansSubscriber#onNext(Span)} or
* {@link ProfilesSubscriber#onNext(ProfileSampleValue)} where you can make it
* available within the system you're embedding into.
* </p>
* <p>
* This package is part of a broader set of OpenTelemetry-related activities
Expand Down
142 changes: 142 additions & 0 deletions persistence-protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2024 Mishmash IO UK Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.mishmash.opentelemetry</groupId>
<artifactId>opentelemetry-server-embedded</artifactId>
<version>1.1.3</version>
</parent>

<artifactId>persistence-protobuf</artifactId>
<packaging>jar</packaging>

<name>Protobuf persistence format for extracted OpenTelemetry signals</name>
<description>
Utility classes for protobuf serialization of OpenTelemetry signals as
extracted and 'flattened' by the embedded collectors.
</description>
<url>https://mishmash.io/open_source/opentelemetry</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<organization>
<name>mishmash io</name>
<url>https://mishmash.io</url>
</organization>

<scm>
<connection>scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git</connection>
<developerConnection>scm:git:https://github.com/mishmash-io/opentelemetry-server-embedded.git</developerConnection>
<url>https://github.com/mishmash-io/opentelemetry-server-embedded</url>
</scm>

<developers>
<developer>
<name>Ivan Kountchev</name>
<email>[email protected]</email>
<organization>mishmash io</organization>
<organizationUrl>https://mishmash.io</organizationUrl>
<roles>
<role>developer</role>
</roles>
</developer>
<developer>
<name>Andrey Rusev</name>
<email>[email protected]</email>
<url>www.linkedin.com/in/andrey-rusev-21894172</url>
<organization>mishmash io</organization>
<organizationUrl>https://mishmash.io</organizationUrl>
<roles>
<role>architect</role>
</roles>
</developer>
</developers>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>

<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
</configuration>
<executions>
<execution>
<id>compile</id>
<configuration>
</configuration>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<excludePackageNames>io.mishmash.opentelemetry.persistence.proto.v1</excludePackageNames>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>io.mishmash.opentelemetry</groupId>
<artifactId>collector-embedded</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2024 Mishmash IO UK Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.mishmash.opentelemetry.persistence.proto;

import io.mishmash.opentelemetry.persistence.proto.v1.LogsPersistenceProto.PersistedLog;
import io.mishmash.opentelemetry.server.collector.Log;
import io.opentelemetry.proto.common.v1.AnyValue;

/**
* Utility class to help with protobuf serialization of {@link Log} instances.
*/
public final class ProtobufLogs {

private ProtobufLogs() {
// constructor is hidden
}

/**
* Get a protobuf representation of a {@link Log}.
*
* @param log the log signal
* @return a populated {@link PersistedLog.Builder}
*/
public static PersistedLog.Builder buildLog(final Log log) {
PersistedLog.Builder builder = PersistedLog.newBuilder()
.setBatchTimestamp(log.getBatchTimestamp())
.setBatchUUID(log.getBatchUUID())
.setSeqNo(log.getSeqNo())
.setIsValid(log.isValid());

if (log.getErrorMessage() != null) {
builder = builder.setErrorMessage(log.getErrorMessage());
}

if (log.getResource() != null) {
builder = builder
.addAllResourceAttributes(
log.getResource().getAttributesList())
.setResourceDroppedAttributesCount(
log.getResource().getDroppedAttributesCount());
}

if (log.getResourceSchemaUrl() != null) {
builder = builder
.setResourceSchemaUrl(log.getResourceSchemaUrl());
}

if (log.getScope() != null) {
builder = builder
.setScopeName(log.getScope().getName())
.setScopeVersion(log.getScope().getVersion())
.addAllScopeAttributes(
log.getScope().getAttributesList())
.setScopeDroppedAttributesCount(
log.getScope().getDroppedAttributesCount());
}

if (log.getLog() != null) {
builder = builder
.setTimeUnixNano(log.getLog().getTimeUnixNano())
.setObservedTimeUnixNano(
log.getLog().getObservedTimeUnixNano())
.setSeverityNumber(log.getLog().getSeverityNumber())
.setSeverityText(log.getLog().getSeverityText())
.addAllAttributes(log.getLog().getAttributesList())
.setDroppedAttributesCount(
log.getLog().getDroppedAttributesCount())
.setFlags(log.getLog().getFlags())
.setTraceId(log.getLog().getTraceId())
.setSpanId(log.getLog().getSpanId());

AnyValue body = log.getLog().getBody();

builder = builder.setBodyType(body.getValueCase().name());

switch (body.getValueCase()) {
case ARRAY_VALUE:
builder = builder.setBodyArray(body.getArrayValue());
break;
case BOOL_VALUE:
builder = builder.setBodyBool(body.getBoolValue());
break;
case BYTES_VALUE:
builder = builder.setBodyBytes(body.getBytesValue());
break;
case DOUBLE_VALUE:
builder = builder.setBodyDouble(body.getDoubleValue());
break;
case INT_VALUE:
builder = builder.setBodyInt(body.getIntValue());
break;
case KVLIST_VALUE:
builder = builder.setBodyKvlist(body.getKvlistValue());
break;
case STRING_VALUE:
builder = builder.setBodyString(body.getStringValue());
break;
case VALUE_NOT_SET:
// FIXME: what to do when not set?
break;
default:
// FIXME: should not ignore
break;
}
}

if (log.getLogSchemaUrl() != null) {
builder = builder.setLogSchemaUrl(log.getLogSchemaUrl());
}

return builder;
}
}
Loading

0 comments on commit b91a6fc

Please sign in to comment.