Skip to content

Commit

Permalink
[exporter/file] add append mode (open-telemetry#31369)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This adds a new option for configuring the append / truncate behavior of
the fileexporter.

**Link to tracking Issue:**
open-telemetry#31364

**Testing:** Added `TestAppend` unit test and manually tested using
`telemetrygen` and the following configuration:

```yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317

exporters:
  file:
    path: ./receiver_output_append.log
    append: true
service:
  telemetry:
    metrics:
      level: detailed
      address: 0.0.0.0:9998
  pipelines:
    logs:
      receivers: [otlp]
      exporters: [file]
```

**Documentation:** <Describe the documentation added.>

TODO: 

- [x] add documentation once we reached agreement regarding
implementation / naming

Signed-off-by: Szilard Parrag <[email protected]>
  • Loading branch information
OverOrion authored and DougManton committed Mar 13, 2024
1 parent 82b498d commit 2a2e45f
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/file-exporter_append_mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: File write mode is configurable now (truncate or append)

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31364]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The following settings are optional:
- localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time.

- `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`.
- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported.
- `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd`
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set.
Expand Down
13 changes: 13 additions & 0 deletions exporter/fileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -22,6 +23,12 @@ type Config struct {
// Path of the file to write to. Path is relative to current directory.
Path string `mapstructure:"path"`

// Mode defines whether the exporter should append to the file
// Options:
// - false[default]: truncates the file
// - true: appends to the file.
Append bool `mapstructure:"append"`

// Rotation defines an option about rotation of telemetry files
Rotation *Rotation `mapstructure:"rotation"`

Expand Down Expand Up @@ -70,6 +77,12 @@ func (cfg *Config) Validate() error {
if cfg.Path == "" {
return errors.New("path must be non-empty")
}
if cfg.Append && cfg.Compression != "" {
return fmt.Errorf("append and compression enabled at the same time is not supported")
}
if cfg.Append && cfg.Rotation != nil {
return fmt.Errorf("append and rotation enabled at the same time is not supported")
}
if cfg.FormatType != formatTypeJSON && cfg.FormatType != formatTypeProto {
return errors.New("format type is not supported")
}
Expand Down
10 changes: 8 additions & 2 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ func newFileExporter(conf *Config) FileExporter {
}
}

func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
var wc io.WriteCloser
if rotation == nil {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
fileFlags := os.O_RDWR | os.O_CREATE
if shouldAppend {
fileFlags |= os.O_APPEND
} else {
fileFlags |= os.O_TRUNC
}
f, err := os.OpenFile(path, fileFlags, 0644)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestNewFileWriter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Append, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
defer func() {
assert.NoError(t, got.file.Close())
}()
Expand Down
2 changes: 1 addition & 1 deletion exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
export := buildExportFunc(e.conf)

var err error
e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export)
e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export)
if err != nil {
return err
}
Expand Down
86 changes: 85 additions & 1 deletion exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"os"
"path/filepath"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -648,7 +649,7 @@ func TestFlushing(t *testing.T) {
}
export := buildExportFunc(fe.conf)
var err error
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export)
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
Expand All @@ -673,3 +674,86 @@ func TestFlushing(t *testing.T) {
assert.EqualValues(t, b, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))
}

func TestAppend(t *testing.T) {
cfg := &Config{
Path: tempFileName(t),
FlushInterval: time.Second,
Append: true,
}

// Create a buffer to capture the output.
bbuf := &tsBuffer{b: &bytes.Buffer{}}
buf := &NopWriteCloser{bbuf}
// Wrap the buffer with the buffered writer closer that implements flush() method.
bwc := newBufferedWriteCloser(buf)
// Create a file exporter with flushing enabled.
feI := newFileExporter(cfg)
assert.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)

// Start the flusher.
ctx := context.Background()
fe.marshaller = &marshaller{
formatType: fe.conf.FormatType,
tracesMarshaler: tracesMarshalers[fe.conf.FormatType],
metricsMarshaler: metricsMarshalers[fe.conf.FormatType],
logsMarshaler: logsMarshalers[fe.conf.FormatType],
compression: fe.conf.Compression,
compressor: buildCompressor(fe.conf.Compression),
}
export := buildExportFunc(fe.conf)
var err error
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
fe.writer.file = bwc
fe.writer.start()

// Write 10 bytes.
b1 := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
i, err := safeFileExporterWrite(fe, b1)
assert.NoError(t, err)
assert.EqualValues(t, len(b1), i, "bytes written")

// Assert buf contains 0 bytes before flush is called.
assert.EqualValues(t, 0, bbuf.Len(), "before flush")

// Wait 1.5 sec
time.Sleep(1500 * time.Millisecond)

// Assert buf contains 10 bytes after flush is called.
assert.EqualValues(t, 10, bbuf.Len(), "after flush")
// Compare the content.
assert.EqualValues(t, b1, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))

// Restart the exporter
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
fe.writer.file = bwc
fe.writer.start()

// Write 10 bytes - again
b2 := []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
i, err = safeFileExporterWrite(fe, b2)
assert.NoError(t, err)
assert.EqualValues(t, len(b2), i, "bytes written")

// Assert buf contains 10 bytes before flush is called.
assert.EqualValues(t, 10, bbuf.Len(), "after restart - before flush")

// Wait 1.5 sec
time.Sleep(1500 * time.Millisecond)

// Assert buf contains 20 bytes after flush is called.
assert.EqualValues(t, 20, bbuf.Len(), "after restart - after flush")
// Compare the content.
bComplete := slices.Clone(b1)
bComplete = append(bComplete, b2...)
assert.EqualValues(t, bComplete, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))
}

0 comments on commit 2a2e45f

Please sign in to comment.