Skip to content

Commit

Permalink
Signal disk buffering (#913)
Browse files Browse the repository at this point in the history
Co-authored-by: Gregor Zeitlinger <[email protected]>
Co-authored-by: jason plumb <[email protected]>
Co-authored-by: jack-berg <[email protected]>
  • Loading branch information
4 people authored Jul 14, 2023
1 parent 3623f98 commit 2b8888d
Show file tree
Hide file tree
Showing 68 changed files with 6,661 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ components:
consistent-sampling:
- oertl
- PeterF778
disk-buffering:
- LikeTheSalad
- zeitlinger
samplers:
- iNikem
- trask
Expand Down
56 changes: 56 additions & 0 deletions disk-buffering/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Contributor Guide

Each one of the three exporters provided by this
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
to be triggered manually by the consumer of this library as explained in the [README](README.md).

## Writing overview

![Writing flow](assets/writing-flow.png)

* The writing process happens automatically within its `export(Collection<SignalData> signals)`
method, which is called by the configured signal processor.
* When a set of signals is received, these are delegated over to
the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java)
class which then serializes them using an implementation
of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java)
and then the serialized data is appended into a File using an instance of
the [Storage](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java)
class.
* The data is written into a file directly, without the use of a buffer, to make sure no data gets
lost in case the application ends unexpectedly.
* Each disk exporter stores its signals in its own folder, which is expected to contain files
that belong to that type of signal only.
* Each file may contain more than a batch of signals if the configuration parameters allow enough
limit size for it.
* If the configured folder size for the signals has been reached and a new file is needed to be
created to keep storing new data, the oldest available file will be removed to make space for the
new one.
* The [Storage](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java),
[FolderManager](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java)
and [WritableFile](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java)
files contain more information on the details of the writing process into a file.

## Reading overview

![Reading flow](assets/reading-flow.png)

* The reading process has to be triggered manually by the library consumer as explained in
the [README](README.md).
* A single file is read at a time and updated to remove the data gathered from it after it is
successfully exported, until it's emptied. Each file previously created during the
writing process has a timestamp in milliseconds, which is used to determine what file to start
reading from, which will be the oldest one available.
* If the oldest file available is stale, which is determined based on the configuration provided at
the time of creating the disk exporter, then it will be ignored, and the next oldest (and
unexpired) one will be used instead.
* All the stale and empty files will be removed as a new file is created.
* The [Storage](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java),
[FolderManager](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java)
and [ReadableFile](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java)
files contain more information on the details of the file reading process.
* Note that the reader delegates the data to the exporter exactly in the way it has received the
data - it does not try to batch data (but this could be an optimization in the future).
113 changes: 113 additions & 0 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Disk buffering

This module provides signal exporter wrappers that intercept and store signals in files which can be
sent later on demand. A high level description of how it works is that there are two separate
processes in place, one for writing data in disk, and one for reading/exporting the previously
stored data.

* Each exporter stores the received data automatically in disk right after it's received from its
processor.
* The reading of the data back from disk and exporting process has to be done manually. At
the moment there's no automatic mechanism to do so. There's more information on it can be
achieved, under [Reading data](#reading-data).

> For a more detailed information on how the whole process works, take a look at
> the [CONTRIBUTING](CONTRIBUTING.md) file.
## Configuration

The configurable parameters are provided **per exporter**, the available ones are:

* Max file size, defaults to 1MB.
* Max folder size, defaults to 10MB. All files are stored in a single folder per-signal, therefore
if all 3 types of signals are stored, the total amount of space from disk to be taken by default
would be of 30MB.
* Max age for file writing, defaults to 30 seconds.
* Min age for file reading, defaults to 33 seconds. It must be greater that the max age for file
writing.
* Max age for file reading, defaults to 18 hours. After that time passes, the file will be
considered stale and will be removed when new files are created. No more data will be read from a
file past this time.
* An instance
of [TemporaryFileProvider](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/files/TemporaryFileProvider.java),
defaults to calling `File.createTempFile`. This provider will be used when reading from the disk
in order create a temporary file from which each line (batch of signals) will be read and
sequentially get removed from the original cache file right after the data has been successfully
exported.

## Usage

### Storing data

In order to use it, you need to wrap your own exporter with a new instance of
the ones provided in here:

* For a LogRecordExporter, it must be wrapped within
a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java).
* For a MetricExporter, it must be wrapped within
a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java).
* For a SpanExporter, it must be wrapped within
a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java).

Each wrapper will need the following when instantiating them:

* The exporter to be wrapped.
* A File instance of the root directory where all the data is going to be written. The same root dir
can be used for all the wrappers, since each will create their own folder inside it.
* An instance
of [StorageConfiguration](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/StorageConfiguration.java)
with the desired parameters. You can create one with default values by
calling `StorageConfiguration.getDefault()`.

After wrapping your exporters, you must register the wrapper as the exporter you'll use. It will
take care of always storing the data it receives.

#### Set up example for spans

```java
// Creating the SpanExporter of our choice.
SpanExporter mySpanExporter = OtlpGrpcSpanExporter.getDefault();

// Wrapping our exporter with its disk exporter.
SpanDiskExporter diskExporter = SpanDiskExporter.create(mySpanExporter, new File("/my/signals/cache/dir"), StorageConfiguration.getDefault());

// Registering the disk exporter within our OpenTelemetry instance.
SdkTracerProvider myTraceProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(diskExporter))
.build();
OpenTelemetrySdk.builder()
.setTracerProvider(myTraceProvider)
.buildAndRegisterGlobal();

```

### Reading data

Each of the exporter wrappers can read from the disk and send the retrieved data over to their
wrapped exporter by calling this method from them:

```java
try {
if(diskExporter.exportStoredBatch(1, TimeUnit.SECONDS)) {
// A batch was successfully exported and removed from disk. You can call this method for as long as it keeps returning true.
} else {
// Either there was no data in the disk or the wrapped exporter returned CompletableResultCode.ofFailure().
}
} catch (IOException e) {
// Something unexpected happened.
}
```

Both the writing and reading processes can run in parallel and they don't overlap
because each is supposed to happen in different files. We ensure that reader and writer don't
accidentally meet in the same file by using the configurable parameters. These parameters set non-overlapping time frames for each action to be done on a single file at a time. On top of that, there's a mechanism in
place to avoid overlapping on edge cases where the time frames ended but the resources haven't been
released. For that mechanism to work properly, this tool assumes that both the reading and the
writing actions are executed within the same application process.

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Gregor Zeitlinger](https://github.com/zeitlinger), Grafana

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
Binary file added disk-buffering/assets/reading-flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added disk-buffering/assets/writing-flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
50 changes: 50 additions & 0 deletions disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import ru.vyarus.gradle.plugin.animalsniffer.AnimalSniffer

plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
id("me.champeau.jmh") version "0.7.1"
id("ru.vyarus.animalsniffer") version "1.7.1"
}

description = "Exporter implementations that store signals on disk"
otelJava.moduleName.set("io.opentelemetry.contrib.exporters.disk")

java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

val autovalueVersion = "1.10.1"
dependencies {
api("io.opentelemetry:opentelemetry-sdk")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
compileOnly("com.google.auto.value:auto-value-annotations:$autovalueVersion")
annotationProcessor("com.google.auto.value:auto-value:$autovalueVersion")
signature("com.toasttab.android:gummy-bears-api-24:0.5.1@signature")
testImplementation("org.mockito:mockito-inline:4.11.0")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}

animalsniffer {
sourceSets = listOf(java.sourceSets.main.get())
}

// Always having declared output makes this task properly participate in tasks up-to-date checks
tasks.withType<AnimalSniffer> {
reports.text.required.set(true)
}

// Attaching animalsniffer check to the compilation process.
tasks.named("classes").configure {
finalizedBy("animalsnifferMain")
}

jmh {
warmupIterations.set(0)
fork.set(2)
iterations.set(5)
timeOnIteration.set("5s")
timeUnit.set("ms")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.files.utils;

import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

public class FileTransferUtilBenchmark {

@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void fileTransfer(FileTransferState state) throws IOException {
state.fileTransferUtil.transferBytes(state.offset, state.amountOfBytesToTransfer);
}

@State(Scope.Benchmark)
public static class FileTransferState {
public FileTransferUtil fileTransferUtil;
public int offset;
public int amountOfBytesToTransfer;
private File inputFile;
private File outputFile;

@Setup
public void setUp() throws IOException {
outputFile = File.createTempFile("output", ".txt");
inputFile = File.createTempFile("input", ".txt");
int totalDataSize = 1024 * 1024; // 1MB
byte[] data = new byte[totalDataSize];
Files.write(inputFile.toPath(), data, StandardOpenOption.CREATE);
fileTransferUtil = new FileTransferUtil(new FileInputStream(inputFile), outputFile);
offset = 512;
amountOfBytesToTransfer = totalDataSize - offset;
}

@TearDown
public void tearDown() throws IOException {
fileTransferUtil.close();
inputFile.delete();
outputFile.delete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.utils.StorageClock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* This is a {@link LogRecordExporter} wrapper that takes care of intercepting all the signals sent
* out to be exported, tries to store them in the disk in order to export them later.
*
* <p>In order to use it, you need to wrap your own {@link LogRecordExporter} with a new instance of
* this one, which will be the one you need to register in your {@link LogRecordProcessor}.
*/
public final class LogRecordDiskExporter implements LogRecordExporter, StoredBatchExporter {
private final LogRecordExporter wrapped;
private final DiskExporter<LogRecordData> diskExporter;

/**
* Creates a new instance of {@link LogRecordDiskExporter}.
*
* @param wrapped - The exporter where the data retrieved from the disk will be delegated to.
* @param rootDir - The directory to create this signal's cache dir where all the data will be
* written into.
* @param configuration - How you want to manage the storage process.
* @throws IOException If no dir can be created in rootDir.
*/
public static LogRecordDiskExporter create(
LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration)
throws IOException {
return create(wrapped, rootDir, configuration, StorageClock.getInstance());
}

// This is used for testing purposes.
static LogRecordDiskExporter create(
LogRecordExporter wrapped,
File rootDir,
StorageConfiguration configuration,
StorageClock clock)
throws IOException {
return new LogRecordDiskExporter(wrapped, rootDir, configuration, clock);
}

private LogRecordDiskExporter(
LogRecordExporter wrapped,
File rootDir,
StorageConfiguration configuration,
StorageClock clock)
throws IOException {
this.wrapped = wrapped;
diskExporter =
new DiskExporter<>(
rootDir, configuration, "logs", SignalSerializer.ofLogs(), wrapped::export, clock);
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return diskExporter.onExport(logs);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
try {
diskExporter.onShutDown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
} finally {
wrapped.shutdown();
}
return CompletableResultCode.ofSuccess();
}

@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return diskExporter.exportStoredBatch(timeout, unit);
}
}
Loading

0 comments on commit 2b8888d

Please sign in to comment.