From 75cfca8628bb5440d8519bd46a5fec2214320d33 Mon Sep 17 00:00:00 2001 From: Szilard Parrag Date: Wed, 6 Mar 2024 18:06:51 +0100 Subject: [PATCH] [exporter/file] add append mode (#31369) **Description:** This adds a new option for configuring the append / truncate behavior of the fileexporter. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31364 **Testing:** Added `TestAppend` unit test and manually tested using `telemetrygen` and the following configuration: ```yaml receivers: otlp: protocols: grpc: endpoint: 0.0.0.0:4317 exporters: file: path: ./receiver_output_append.log append: true service: telemetry: metrics: level: detailed address: 0.0.0.0:9998 pipelines: logs: receivers: [otlp] exporters: [file] ``` **Documentation:** TODO: - [x] add documentation once we reached agreement regarding implementation / naming Signed-off-by: Szilard Parrag --- .chloggen/file-exporter_append_mode.yaml | 27 +++++++ exporter/fileexporter/README.md | 1 + exporter/fileexporter/config.go | 13 ++++ exporter/fileexporter/factory.go | 10 ++- exporter/fileexporter/factory_test.go | 2 +- exporter/fileexporter/file_exporter.go | 2 +- exporter/fileexporter/file_exporter_test.go | 86 ++++++++++++++++++++- 7 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 .chloggen/file-exporter_append_mode.yaml diff --git a/.chloggen/file-exporter_append_mode.yaml b/.chloggen/file-exporter_append_mode.yaml new file mode 100644 index 000000000000..4520a010887f --- /dev/null +++ b/.chloggen/file-exporter_append_mode.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: File write mode is configurable now (truncate or append) + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31364] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/fileexporter/README.md b/exporter/fileexporter/README.md index c5cebb2ac942..b83ef3cc2743 100644 --- a/exporter/fileexporter/README.md +++ b/exporter/fileexporter/README.md @@ -47,6 +47,7 @@ The following settings are optional: - localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time. - `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`. +- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported. - `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd` - `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats. NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set. diff --git a/exporter/fileexporter/config.go b/exporter/fileexporter/config.go index 01b228301eef..b3485c23d90f 100644 --- a/exporter/fileexporter/config.go +++ b/exporter/fileexporter/config.go @@ -5,6 +5,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "errors" + "fmt" "time" "go.opentelemetry.io/collector/component" @@ -22,6 +23,12 @@ type Config struct { // Path of the file to write to. Path is relative to current directory. Path string `mapstructure:"path"` + // Mode defines whether the exporter should append to the file + // Options: + // - false[default]: truncates the file + // - true: appends to the file. + Append bool `mapstructure:"append"` + // Rotation defines an option about rotation of telemetry files Rotation *Rotation `mapstructure:"rotation"` @@ -70,6 +77,12 @@ func (cfg *Config) Validate() error { if cfg.Path == "" { return errors.New("path must be non-empty") } + if cfg.Append && cfg.Compression != "" { + return fmt.Errorf("append and compression enabled at the same time is not supported") + } + if cfg.Append && cfg.Rotation != nil { + return fmt.Errorf("append and rotation enabled at the same time is not supported") + } if cfg.FormatType != formatTypeJSON && cfg.FormatType != formatTypeProto { return errors.New("format type is not supported") } diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index b75a0c0eb528..5f7a1aaa727e 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -129,10 +129,16 @@ func newFileExporter(conf *Config) FileExporter { } } -func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) { +func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) { var wc io.WriteCloser if rotation == nil { - f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + fileFlags := os.O_RDWR | os.O_CREATE + if shouldAppend { + fileFlags |= os.O_APPEND + } else { + fileFlags |= os.O_TRUNC + } + f, err := os.OpenFile(path, fileFlags, 0644) if err != nil { return nil, err } diff --git a/exporter/fileexporter/factory_test.go b/exporter/fileexporter/factory_test.go index 1f8a99c79163..950bd4cce82c 100644 --- a/exporter/fileexporter/factory_test.go +++ b/exporter/fileexporter/factory_test.go @@ -168,7 +168,7 @@ func TestNewFileWriter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil) + got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Append, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil) defer func() { assert.NoError(t, got.file.Close()) }() diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index 18cf7aa58637..d37478c540d8 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -56,7 +56,7 @@ func (e *fileExporter) Start(_ context.Context, _ component.Host) error { export := buildExportFunc(e.conf) var err error - e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export) + e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export) if err != nil { return err } diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index 5cff81001f62..e1d6d5206b44 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -11,6 +11,7 @@ import ( "io" "os" "path/filepath" + "slices" "sync" "testing" "time" @@ -648,7 +649,7 @@ func TestFlushing(t *testing.T) { } export := buildExportFunc(fe.conf) var err error - fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export) + fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export) assert.NoError(t, err) err = fe.writer.file.Close() assert.NoError(t, err) @@ -673,3 +674,86 @@ func TestFlushing(t *testing.T) { assert.EqualValues(t, b, bbuf.Bytes()) assert.NoError(t, fe.Shutdown(ctx)) } + +func TestAppend(t *testing.T) { + cfg := &Config{ + Path: tempFileName(t), + FlushInterval: time.Second, + Append: true, + } + + // Create a buffer to capture the output. + bbuf := &tsBuffer{b: &bytes.Buffer{}} + buf := &NopWriteCloser{bbuf} + // Wrap the buffer with the buffered writer closer that implements flush() method. + bwc := newBufferedWriteCloser(buf) + // Create a file exporter with flushing enabled. + feI := newFileExporter(cfg) + assert.IsType(t, &fileExporter{}, feI) + fe := feI.(*fileExporter) + + // Start the flusher. + ctx := context.Background() + fe.marshaller = &marshaller{ + formatType: fe.conf.FormatType, + tracesMarshaler: tracesMarshalers[fe.conf.FormatType], + metricsMarshaler: metricsMarshalers[fe.conf.FormatType], + logsMarshaler: logsMarshalers[fe.conf.FormatType], + compression: fe.conf.Compression, + compressor: buildCompressor(fe.conf.Compression), + } + export := buildExportFunc(fe.conf) + var err error + fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export) + assert.NoError(t, err) + err = fe.writer.file.Close() + assert.NoError(t, err) + fe.writer.file = bwc + fe.writer.start() + + // Write 10 bytes. + b1 := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + i, err := safeFileExporterWrite(fe, b1) + assert.NoError(t, err) + assert.EqualValues(t, len(b1), i, "bytes written") + + // Assert buf contains 0 bytes before flush is called. + assert.EqualValues(t, 0, bbuf.Len(), "before flush") + + // Wait 1.5 sec + time.Sleep(1500 * time.Millisecond) + + // Assert buf contains 10 bytes after flush is called. + assert.EqualValues(t, 10, bbuf.Len(), "after flush") + // Compare the content. + assert.EqualValues(t, b1, bbuf.Bytes()) + assert.NoError(t, fe.Shutdown(ctx)) + + // Restart the exporter + fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export) + assert.NoError(t, err) + err = fe.writer.file.Close() + assert.NoError(t, err) + fe.writer.file = bwc + fe.writer.start() + + // Write 10 bytes - again + b2 := []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20} + i, err = safeFileExporterWrite(fe, b2) + assert.NoError(t, err) + assert.EqualValues(t, len(b2), i, "bytes written") + + // Assert buf contains 10 bytes before flush is called. + assert.EqualValues(t, 10, bbuf.Len(), "after restart - before flush") + + // Wait 1.5 sec + time.Sleep(1500 * time.Millisecond) + + // Assert buf contains 20 bytes after flush is called. + assert.EqualValues(t, 20, bbuf.Len(), "after restart - after flush") + // Compare the content. + bComplete := slices.Clone(b1) + bComplete = append(bComplete, b2...) + assert.EqualValues(t, bComplete, bbuf.Bytes()) + assert.NoError(t, fe.Shutdown(ctx)) +}