diff --git a/exporter/fileexporter/buffered_writer.go b/exporter/fileexporter/buffered_writer.go new file mode 100644 index 000000000000..d802860c815a --- /dev/null +++ b/exporter/fileexporter/buffered_writer.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" + +import ( + "bufio" + "io" + + "go.uber.org/multierr" +) + +// bufferedWriteCloser is intended to use more memory +// in order to optimize writing to disk to help improve performance. +type bufferedWriteCloser struct { + wrapped io.Closer + buffered *bufio.Writer +} + +var ( + _ io.WriteCloser = (*bufferedWriteCloser)(nil) +) + +func newBufferedWriteCloser(f io.WriteCloser) io.WriteCloser { + return &bufferedWriteCloser{ + wrapped: f, + buffered: bufio.NewWriter(f), + } +} + +func (bwc *bufferedWriteCloser) Write(p []byte) (n int, err error) { + return bwc.buffered.Write(p) +} + +func (bwc *bufferedWriteCloser) Close() error { + return multierr.Combine( + bwc.buffered.Flush(), + bwc.wrapped.Close(), + ) +} diff --git a/exporter/fileexporter/buffered_writer_test.go b/exporter/fileexporter/buffered_writer_test.go new file mode 100644 index 000000000000..7e4fb342eaee --- /dev/null +++ b/exporter/fileexporter/buffered_writer_test.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileexporter + +import ( + "bytes" + "fmt" + "io" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/multierr" +) + +const ( + msg = "it is a beautiful world" + + SizeByte = 1 + SizeKiloByte = 1 << (10 * iota) + SizeMegaByte +) + +type NopWriteCloser struct { + w io.Writer +} + +func (NopWriteCloser) Close() error { return nil } +func (wc *NopWriteCloser) Write(p []byte) (int, error) { return wc.w.Write(p) } + +func TestBufferedWrites(t *testing.T) { + t.Parallel() + + b := bytes.NewBuffer(nil) + w := newBufferedWriteCloser(&NopWriteCloser{b}) + + _, err := w.Write([]byte(msg)) + require.NoError(t, err, "Must not error when writing data") + assert.NoError(t, w.Close(), "Must not error when closing writer") + + assert.Equal(t, msg, b.String(), "Must match the expected string") +} + +var ( + benchmarkErr error +) + +func BenchmarkWriter(b *testing.B) { + tempfile := func(tb testing.TB) io.WriteCloser { + f, err := os.CreateTemp(tb.TempDir(), tb.Name()) + require.NoError(tb, err, "Must not error when creating benchmark temp file") + tb.Cleanup(func() { + assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "Must clean up files after being written") + }) + return f + } + + for _, payloadSize := range []int{ + 10 * SizeKiloByte, + 100 * SizeKiloByte, + SizeMegaByte, + 10 * SizeMegaByte, + } { + payload := make([]byte, payloadSize) + for i := 0; i < payloadSize; i++ { + payload[i] = 'a' + } + for name, w := range map[string]io.WriteCloser{ + "discard": &NopWriteCloser{io.Discard}, + "buffered-discard": newBufferedWriteCloser(&NopWriteCloser{io.Discard}), + "raw-file": tempfile(b), + "buffered-file": newBufferedWriteCloser(tempfile(b)), + } { + w := w + b.Run(fmt.Sprintf("%s_%d_bytes", name, payloadSize), func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + var err error + for i := 0; i < b.N; i++ { + _, err = w.Write(payload) + } + benchmarkErr = multierr.Combine(err, w.Close()) + }) + } + } + +} diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index 8fca3db1171e..8f859de25850 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -146,7 +146,11 @@ func newFileExporter(conf *Config, writer io.WriteCloser) *fileExporter { func buildFileWriter(cfg *Config) (io.WriteCloser, error) { if cfg.Rotation == nil { - return os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return nil, err + } + return newBufferedWriteCloser(f), nil } return &lumberjack.Logger{ Filename: cfg.Path, diff --git a/exporter/fileexporter/factory_test.go b/exporter/fileexporter/factory_test.go index ca034187dc5f..84c6daa05b37 100644 --- a/exporter/fileexporter/factory_test.go +++ b/exporter/fileexporter/factory_test.go @@ -17,7 +17,6 @@ package fileexporter import ( "context" "io" - "os" "testing" "github.com/stretchr/testify/assert" @@ -123,7 +122,7 @@ func TestBuildFileWriter(t *testing.T) { }, }, validate: func(t *testing.T, closer io.WriteCloser) { - _, ok := closer.(*os.File) + _, ok := closer.(*bufferedWriteCloser) assert.Equal(t, true, ok) }, }, diff --git a/exporter/fileexporter/go.mod b/exporter/fileexporter/go.mod index dc26a731213d..31c495176fd1 100644 --- a/exporter/fileexporter/go.mod +++ b/exporter/fileexporter/go.mod @@ -12,6 +12,7 @@ require ( go.opentelemetry.io/collector/confmap v0.69.2-0.20230112233839-f2a0133bf677 go.opentelemetry.io/collector/consumer v0.69.2-0.20230112233839-f2a0133bf677 go.opentelemetry.io/collector/pdata v1.0.0-rc3.0.20230112233839-f2a0133bf677 + go.uber.org/multierr v1.9.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -35,7 +36,6 @@ require ( go.opentelemetry.io/otel/metric v0.34.0 // indirect go.opentelemetry.io/otel/trace v1.11.2 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect