Skip to content

Commit

Permalink
[exporter/file] Improving write performance speed (#17543)
Browse files Browse the repository at this point in the history
* Adding support for buffered writes
  • Loading branch information
MovieStoreGuy authored Jan 17, 2023
1 parent a6d8f11 commit e991cc7
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 4 deletions.
51 changes: 51 additions & 0 deletions exporter/fileexporter/buffered_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"

import (
"bufio"
"io"

"go.uber.org/multierr"
)

// bufferedWriteCloser is intended to use more memory
// in order to optimize writing to disk to help improve performance.
type bufferedWriteCloser struct {
wrapped io.Closer
buffered *bufio.Writer
}

var (
_ io.WriteCloser = (*bufferedWriteCloser)(nil)
)

func newBufferedWriteCloser(f io.WriteCloser) io.WriteCloser {
return &bufferedWriteCloser{
wrapped: f,
buffered: bufio.NewWriter(f),
}
}

func (bwc *bufferedWriteCloser) Write(p []byte) (n int, err error) {
return bwc.buffered.Write(p)
}

func (bwc *bufferedWriteCloser) Close() error {
return multierr.Combine(
bwc.buffered.Flush(),
bwc.wrapped.Close(),
)
}
102 changes: 102 additions & 0 deletions exporter/fileexporter/buffered_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileexporter

import (
"bytes"
"fmt"
"io"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

const (
msg = "it is a beautiful world"

SizeByte = 1
SizeKiloByte = 1 << (10 * iota)
SizeMegaByte
)

type NopWriteCloser struct {
w io.Writer
}

func (NopWriteCloser) Close() error { return nil }
func (wc *NopWriteCloser) Write(p []byte) (int, error) { return wc.w.Write(p) }

func TestBufferedWrites(t *testing.T) {
t.Parallel()

b := bytes.NewBuffer(nil)
w := newBufferedWriteCloser(&NopWriteCloser{b})

_, err := w.Write([]byte(msg))
require.NoError(t, err, "Must not error when writing data")
assert.NoError(t, w.Close(), "Must not error when closing writer")

assert.Equal(t, msg, b.String(), "Must match the expected string")
}

var (
benchmarkErr error
)

func BenchmarkWriter(b *testing.B) {
tempfile := func(tb testing.TB) io.WriteCloser {
f, err := os.CreateTemp(tb.TempDir(), tb.Name())
require.NoError(tb, err, "Must not error when creating benchmark temp file")
tb.Cleanup(func() {
assert.NoError(tb, os.RemoveAll(path.Dir(f.Name())), "Must clean up files after being written")
})
return f
}

for _, payloadSize := range []int{
10 * SizeKiloByte,
100 * SizeKiloByte,
SizeMegaByte,
10 * SizeMegaByte,
} {
payload := make([]byte, payloadSize)
for i := 0; i < payloadSize; i++ {
payload[i] = 'a'
}
for name, w := range map[string]io.WriteCloser{
"discard": &NopWriteCloser{io.Discard},
"buffered-discard": newBufferedWriteCloser(&NopWriteCloser{io.Discard}),
"raw-file": tempfile(b),
"buffered-file": newBufferedWriteCloser(tempfile(b)),
} {
w := w
b.Run(fmt.Sprintf("%s_%d_bytes", name, payloadSize), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

var err error
for i := 0; i < b.N; i++ {
_, err = w.Write(payload)
}
benchmarkErr = multierr.Combine(err, w.Close())
})
}
}

}
6 changes: 5 additions & 1 deletion exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ func newFileExporter(conf *Config, writer io.WriteCloser) *fileExporter {

func buildFileWriter(cfg *Config) (io.WriteCloser, error) {
if cfg.Rotation == nil {
return os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
return newBufferedWriteCloser(f), nil
}
return &lumberjack.Logger{
Filename: cfg.Path,
Expand Down
3 changes: 1 addition & 2 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package fileexporter
import (
"context"
"io"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -123,7 +122,7 @@ func TestBuildFileWriter(t *testing.T) {
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
_, ok := closer.(*os.File)
_, ok := closer.(*bufferedWriteCloser)
assert.Equal(t, true, ok)
},
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/fileexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/collector/confmap v0.69.2-0.20230112233839-f2a0133bf677
go.opentelemetry.io/collector/consumer v0.69.2-0.20230112233839-f2a0133bf677
go.opentelemetry.io/collector/pdata v1.0.0-rc3.0.20230112233839-f2a0133bf677
go.uber.org/multierr v1.9.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

Expand All @@ -35,7 +36,6 @@ require (
go.opentelemetry.io/otel/metric v0.34.0 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
Expand Down

0 comments on commit e991cc7

Please sign in to comment.