Skip to content

Commit

Permalink
[disk-buffering] Split serializer (open-telemetry#1167)
Browse files Browse the repository at this point in the history
  • Loading branch information
breedx-splk authored Jan 25, 2024
1 parent aaf0446 commit 5000adf
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
Expand All @@ -24,7 +24,7 @@ public static LogRecordFromDiskExporter create(
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofLogs())
.setDeserializer(SignalDeserializer.ofLogs())
.setExportFunction(exporter::export)
.build();
return new LogRecordFromDiskExporter(delegate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
Expand All @@ -24,7 +24,7 @@ public static MetricFromDiskExporter create(MetricExporter exporter, StorageConf
FromDiskExporterImpl.<MetricData>builder()
.setFolderName("metrics")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofMetrics())
.setDeserializer(SignalDeserializer.ofMetrics())
.setExportFunction(exporter::export)
.build();
return new MetricFromDiskExporter(delegate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
Expand All @@ -24,7 +24,7 @@ public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfigur
FromDiskExporterImpl.<SpanData>builder()
.setFolderName("spans")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofSpans())
.setDeserializer(SignalDeserializer.ofSpans())
.setExportFunction(exporter::export)
.build();
return new SpanFromDiskExporter(delegate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,29 @@

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

public class FromDiskExporterBuilder<T> {

private SignalSerializer<T> serializer = noopSerializer();
private SignalDeserializer<T> serializer = noopDeserializer();
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();

@NotNull
private static <T> SignalSerializer<T> noopSerializer() {
return new SignalSerializer<T>() {

@Override
public byte[] serialize(Collection<T> ts) {
return new byte[0];
}

@Override
public List<T> deserialize(byte[] source) {
return Collections.emptyList();
}
};
private static <T> SignalDeserializer<T> noopDeserializer() {
return x -> emptyList();
}

private final StorageBuilder storageBuilder = Storage.builder();
Expand All @@ -62,7 +51,7 @@ public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDeserializer(SignalSerializer<T> serializer) {
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
this.serializer = serializer;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -22,12 +22,12 @@
*/
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> deserializer;
private final SignalDeserializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());

FromDiskExporterImpl(
SignalSerializer<EXPORT_DATA> deserializer,
SignalDeserializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
this.deserializer = deserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,11 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

public final class ToDiskExporterBuilder<T> {

private SignalSerializer<T> serializer =
new SignalSerializer<T>() {

@Override
public byte[] serialize(Collection<T> ts) {
return new byte[0];
}

@Override
public List<T> deserialize(byte[] source) {
return Collections.emptyList();
}
};
private SignalSerializer<T> serializer = ts -> new byte[0];

private final StorageBuilder storageBuilder = Storage.builder();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
import io.opentelemetry.proto.logs.v1.LogsData;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.IOException;
import java.util.List;

public final class LogRecordDataDeserializer implements SignalDeserializer<LogRecordData> {
private static final LogRecordDataDeserializer INSTANCE = new LogRecordDataDeserializer();

private LogRecordDataDeserializer() {}

static LogRecordDataDeserializer getInstance() {
return INSTANCE;
}

@Override
public List<LogRecordData> deserialize(byte[] source) {
try {
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.io.IOException;
import java.util.List;

public final class MetricDataDeserializer implements SignalDeserializer<MetricData> {
private static final MetricDataDeserializer INSTANCE = new MetricDataDeserializer();

private MetricDataDeserializer() {}

static MetricDataDeserializer getInstance() {
return INSTANCE;
}

@Override
public List<MetricData> deserialize(byte[] source) {
try {
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;

public interface SignalDeserializer<SDK_ITEM> {

static SignalDeserializer<SpanData> ofSpans() {
return SpanDataDeserializer.getInstance();
}

static SignalDeserializer<MetricData> ofMetrics() {
return MetricDataDeserializer.getInstance();
}

static SignalDeserializer<LogRecordData> ofLogs() {
return LogRecordDataDeserializer.getInstance();
}

List<SDK_ITEM> deserialize(byte[] source);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
import io.opentelemetry.proto.trace.v1.TracesData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.List;

public final class SpanDataDeserializer implements SignalDeserializer<SpanData> {
private static final SpanDataDeserializer INSTANCE = new SpanDataDeserializer();

private SpanDataDeserializer() {}

static SpanDataDeserializer getInstance() {
return INSTANCE;
}

@Override
public List<SpanData> deserialize(byte[] source) {
try {
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public final class LogRecordDataSerializer implements SignalSerializer<LogRecordData> {
private static final LogRecordDataSerializer INSTANCE = new LogRecordDataSerializer();
Expand All @@ -35,13 +34,4 @@ public byte[] serialize(Collection<LogRecordData> logRecordData) {
throw new IllegalStateException(e);
}
}

@Override
public List<LogRecordData> deserialize(byte[] source) {
try {
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public final class MetricDataSerializer implements SignalSerializer<MetricData> {
private static final MetricDataSerializer INSTANCE = new MetricDataSerializer();
Expand All @@ -35,13 +34,4 @@ public byte[] serialize(Collection<MetricData> metricData) {
throw new IllegalStateException(e);
}
}

@Override
public List<MetricData> deserialize(byte[] source) {
try {
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.List;

public interface SignalSerializer<SDK_ITEM> {

Expand All @@ -26,6 +25,4 @@ static SignalSerializer<LogRecordData> ofLogs() {
}

byte[] serialize(Collection<SDK_ITEM> items);

List<SDK_ITEM> deserialize(byte[] source);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public final class SpanDataSerializer implements SignalSerializer<SpanData> {
private static final SpanDataSerializer INSTANCE = new SpanDataSerializer();
Expand All @@ -35,13 +34,4 @@ public byte[] serialize(Collection<SpanData> spanData) {
throw new IllegalStateException(e);
}
}

@Override
public List<SpanData> deserialize(byte[] source) {
try {
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}
Loading

0 comments on commit 5000adf

Please sign in to comment.