Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/disk-buffer' into submod-diskbuff
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 3, 2020
2 parents e620fd2 + ee559ea commit 51d296b
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 76 deletions.
3 changes: 1 addition & 2 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand All @@ -13,7 +12,7 @@ import (
type Buffer interface {
Add(context.Context, *entry.Entry) error
Read([]*entry.Entry) (func(), int, error)
ReadWait([]*entry.Entry, <-chan time.Time) (func(), int, error)
ReadWait(context.Context, []*entry.Entry) (func(), int, error)
Close() error
}

Expand Down
1 change: 1 addition & 0 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestBufferUnmarshalYAML(t *testing.T) {
BufferBuilder: &DiskBufferConfig{
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
},
},
},
Expand Down
24 changes: 15 additions & 9 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ type DiskBufferConfig struct {
// TODO make this configurable in human-readable terms
MaxSize int `json:"max_size" yaml:"max_size"`
Path string `json:"path" yaml:"path"`
Sync bool `json:"sync" yaml:"sync"`
}

// NewDiskBufferConfig creates a new default disk buffer config
func NewDiskBufferConfig() *DiskBufferConfig {
return &DiskBufferConfig{
MaxSize: 1 << 32, // 4GiB
Sync: true,
}
}

func (c DiskBufferConfig) Build(context operator.BuildContext, _ string) (Buffer, error) {
b := NewDiskBuffer(c.MaxSize)
if err := b.Open(c.Path); err != nil {
if err := b.Open(c.Path, c.Sync); err != nil {
return nil, err
}
return b, nil
Expand Down Expand Up @@ -81,15 +83,19 @@ func NewDiskBuffer(maxDiskSize int) *DiskBuffer {
}

// Open opens the disk buffer files from a database directory
func (d *DiskBuffer) Open(path string) error {
func (d *DiskBuffer) Open(path string, sync bool) error {
var err error
dataPath := filepath.Join(path, "data")
if d.data, err = os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0755); err != nil {
flags := os.O_CREATE | os.O_RDWR
if sync {
flags |= os.O_SYNC
}
if d.data, err = os.OpenFile(dataPath, flags, 0755); err != nil {
return err
}

metadataPath := filepath.Join(path, "metadata")
if d.metadata, err = OpenMetadata(metadataPath); err != nil {
if d.metadata, err = OpenMetadata(metadataPath, sync); err != nil {
return err
}

Expand Down Expand Up @@ -178,10 +184,10 @@ func (d *DiskBuffer) addUnreadCount(i int64) {
}

// ReadWait reads entries from the buffer, waiting until either there are enough entries in the
// buffer to fill dst, or an event is sent down the timeout channel. This amortizes the cost
// of reading from the disk. It returns a function that, when called, marks the read entries as
// flushed, the number of entries read, and an error.
func (d *DiskBuffer) ReadWait(dst []*entry.Entry, timeout <-chan time.Time) (func(), int, error) {
// buffer to fill dst or the context is cancelled. This amortizes the cost of reading from the
// disk. It returns a function that, when called, marks the read entries as flushed, the
// number of entries read, and an error.
func (d *DiskBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (func(), int, error) {
d.readerLock.Lock()
defer d.readerLock.Unlock()

Expand All @@ -193,7 +199,7 @@ LOOP:
if n >= int64(len(dst)) {
break LOOP
}
case <-timeout:
case <-ctx.Done():
break LOOP
}
}
Expand Down
8 changes: 6 additions & 2 deletions operator/buffer/disk_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ type Metadata struct {
}

// OpenMetadata opens and parses the metadata
func OpenMetadata(path string) (*Metadata, error) {
func OpenMetadata(path string, sync bool) (*Metadata, error) {
m := &Metadata{}

var err error
if m.file, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0755); err != nil {
flags := os.O_CREATE | os.O_RDWR
if sync {
flags |= os.O_SYNC
}
if m.file, err = os.OpenFile(path, flags, 0755); err != nil {
return &Metadata{}, err
}

Expand Down
109 changes: 76 additions & 33 deletions operator/buffer/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func openBuffer(t testing.TB) *DiskBuffer {
buffer := NewDiskBuffer(1 << 20)
dir := testutil.NewTempDir(t)
err := buffer.Open(dir)
err := buffer.Open(dir, false)
require.NoError(t, err)
t.Cleanup(func() { buffer.Close() })
return buffer
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestDiskBuffer(t *testing.T) {
t.Parallel()
b := NewDiskBuffer(1 << 30)
dir := testutil.NewTempDir(t)
err := b.Open(dir)
err := b.Open(dir, false)
require.NoError(t, err)

writeN(t, b, 20, 0)
Expand All @@ -139,7 +139,7 @@ func TestDiskBuffer(t *testing.T) {
require.NoError(t, err)

b2 := NewDiskBuffer(1 << 30)
err = b2.Open(dir)
err = b2.Open(dir, false)
require.NoError(t, err)
readN(t, b2, 20, 0)
})
Expand All @@ -148,7 +148,7 @@ func TestDiskBuffer(t *testing.T) {
t.Parallel()
b := NewDiskBuffer(1 << 30)
dir := testutil.NewTempDir(t)
err := b.Open(dir)
err := b.Open(dir, false)
require.NoError(t, err)

writeN(t, b, 20, 0)
Expand All @@ -157,7 +157,7 @@ func TestDiskBuffer(t *testing.T) {
require.NoError(t, err)

b2 := NewDiskBuffer(1 << 30)
err = b2.Open(dir)
err = b2.Open(dir, false)
require.NoError(t, err)
readN(t, b2, 10, 10)
})
Expand All @@ -173,7 +173,7 @@ func TestDiskBuffer(t *testing.T) {

b := NewDiskBuffer(1 << 30)
dir := testutil.NewTempDir(t)
err := b.Open(dir)
err := b.Open(dir, false)
require.NoError(t, err)

writes := 0
Expand Down Expand Up @@ -204,32 +204,75 @@ func TestDiskBuffer(t *testing.T) {
}

func BenchmarkDiskBuffer(b *testing.B) {
buffer := openBuffer(b)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Benchmark: %d\n", b.N)
e := entry.New()
e.Record = "test log"
ctx := context.Background()
for i := 0; i < b.N; i++ {
panicOnErr(buffer.Add(ctx, e))
}
}()

wg.Add(1)
go func() {
defer wg.Done()
dst := make([]*entry.Entry, 1000)
for i := 0; i < b.N; {
flush, n, err := buffer.ReadWait(dst, time.After(50*time.Millisecond))
panicOnErr(err)
i += n
flush()
}
}()
b.Run("NoSync", func(b *testing.B) {
buffer := openBuffer(b)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Benchmark: %d\n", b.N)
e := entry.New()
e.Record = "test log"
ctx := context.Background()
for i := 0; i < b.N; i++ {
panicOnErr(buffer.Add(ctx, e))
}
}()

wg.Add(1)
go func() {
defer wg.Done()
dst := make([]*entry.Entry, 1000)
ctx := context.Background()
for i := 0; i < b.N; {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
flush, n, err := buffer.ReadWait(ctx, dst)
cancel()
panicOnErr(err)
i += n
flush()
}
}()

wg.Wait()
})

b.Run("Sync", func(b *testing.B) {
buffer := NewDiskBuffer(1 << 20)
dir := testutil.NewTempDir(b)
err := buffer.Open(dir, true)
require.NoError(b, err)
b.Cleanup(func() { buffer.Close() })
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Benchmark: %d\n", b.N)
e := entry.New()
e.Record = "test log"
ctx := context.Background()
for i := 0; i < b.N; i++ {
panicOnErr(buffer.Add(ctx, e))
}
}()

wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
dst := make([]*entry.Entry, 1000)
ctx := context.Background()
for i := 0; i < b.N; {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
flush, n, err := buffer.ReadWait(ctx, dst)
cancel()
panicOnErr(err)
i += n
flush()
}
}()

wg.Wait()
})
}
29 changes: 15 additions & 14 deletions operator/buffer/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/gob"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand Down Expand Up @@ -60,7 +59,7 @@ func (m *MemoryBuffer) Add(ctx context.Context, e *entry.Entry) error {
case m.buf <- e:
return nil
case <-ctx.Done():
return fmt.Errorf("context cancelled")
return ctx.Err()
}
}

Expand All @@ -84,7 +83,7 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (func(), int, error) {
return m.newFlushFunc(inFlight[:i]), i, nil
}

func (m *MemoryBuffer) ReadWait(dst []*entry.Entry, timeout <-chan time.Time) (func(), int, error) {
func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (func(), int, error) {
inFlightIDs := make([]uint64, len(dst))
i := 0
for ; i < len(dst); i++ {
Expand All @@ -96,7 +95,7 @@ func (m *MemoryBuffer) ReadWait(dst []*entry.Entry, timeout <-chan time.Time) (f
m.inFlight[id] = e
m.inFlightMux.Unlock()
inFlightIDs[i] = id
case <-timeout:
case <-ctx.Done():
return m.newFlushFunc(inFlightIDs[:i]), i, nil
}
}
Expand Down Expand Up @@ -133,21 +132,23 @@ func (m *MemoryBuffer) Close() error {
}
}

close(m.buf)
for e := range m.buf {
m.entryID++
if err := putKeyValue(b, m.entryID, e); err != nil {
return err
for {
select {
case e := <-m.buf:
m.entryID++
if err := putKeyValue(b, m.entryID, e); err != nil {
return err
}
default:
return nil
}
}

return nil
})
}

func putKeyValue(b *bbolt.Bucket, k uint64, v *entry.Entry) error {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
enc := json.NewEncoder(&buf)
key := [8]byte{}

binary.LittleEndian.PutUint64(key[:], k)
Expand All @@ -174,7 +175,7 @@ func (m *MemoryBuffer) loadFromDB() error {
return fmt.Errorf("max_entries is smaller than the number of entries stored in the database")
}

dec := gob.NewDecoder(bytes.NewReader(v))
dec := json.NewDecoder(bytes.NewReader(v))
var e entry.Entry
if err := dec.Decode(&e); err != nil {
return err
Expand Down
27 changes: 26 additions & 1 deletion operator/buffer/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ func TestMemoryBuffer(t *testing.T) {
readN(t, b, 10, 10)
})

t.Run("CheckN", func(t *testing.T) {
t.Run("Read", func(t *testing.T) {
b := newMemoryBuffer(t)
writeN(t, b, 12, 0)
dst := make([]*entry.Entry, 30)
_, n, err := b.Read(dst)
require.NoError(t, err)
require.Equal(t, n, 12)
})

t.Run("ReadWait", func(t *testing.T) {
b := newMemoryBuffer(t)
writeN(t, b, 12, 0)
dst := make([]*entry.Entry, 30)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
_, n, err := b.ReadWait(ctx, dst)
require.NoError(t, err)
require.Equal(t, n, 12)
})
})

t.Run("SingleReadWaitMultipleWrites", func(t *testing.T) {
t.Parallel()
b := newMemoryBuffer(t)
Expand Down Expand Up @@ -173,8 +195,11 @@ func BenchmarkMemoryBuffer(b *testing.B) {
go func() {
defer wg.Done()
dst := make([]*entry.Entry, 1000)
ctx := context.Background()
for i := 0; i < b.N; {
flush, n, err := buffer.ReadWait(dst, time.After(50*time.Millisecond))
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
flush, n, err := buffer.ReadWait(ctx, dst)
cancel()
panicOnErr(err)
i += n
go func() {
Expand Down
Loading

0 comments on commit 51d296b

Please sign in to comment.