Skip to content

Commit

Permalink
Merged in disk-buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 3, 2020
2 parents 7a52d76 + 0961983 commit e620fd2
Show file tree
Hide file tree
Showing 35 changed files with 1,667 additions and 6,737 deletions.
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ go 1.14

require (
github.com/antonmedv/expr v1.8.8
github.com/cenkalti/backoff/v4 v4.0.2
github.com/observiq/ctimefmt v1.0.0
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.15.0
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
golang.org/x/tools v0.0.0-20200828161849-5deb26317202 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gonum.org/v1/gonum v0.8.1
google.golang.org/api v0.31.0
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
303 changes: 1 addition & 302 deletions go.sum

Large diffs are not rendered by default.

104 changes: 52 additions & 52 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,77 @@ package buffer

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
)

// Buffer is an entity that buffers log entries to an operator
type Buffer interface {
Flush(context.Context) error
Add(interface{}, int) error
AddWait(context.Context, interface{}, int) error
SetHandler(BundleHandler)
Process(context.Context, *entry.Entry) error
Add(context.Context, *entry.Entry) error
Read([]*entry.Entry) (func(), int, error)
ReadWait([]*entry.Entry, <-chan time.Time) (func(), int, error)
Close() error
}

type Config struct {
Type string `json:"type" yaml:"type"`
BufferBuilder
}

// NewConfig creates a new buffer config
func NewConfig() Config {
return Config{
BufferType: "memory",
DelayThreshold: operator.Duration{Duration: time.Second},
BundleCountThreshold: 10_000,
BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB
BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB
BufferedByteLimit: 500 * 1024 * 1024 * 1024, // 500MB
HandlerLimit: 16,
Retry: NewRetryConfig(),
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 1 << 20,
},
}
}

// Config is the configuration of a buffer
type Config struct {
BufferType string `json:"type,omitempty" yaml:"type,omitempty"`
DelayThreshold operator.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"`
BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"`
BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"`
BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"`
BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"`
HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"`
Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"`
type BufferBuilder interface {
Build(context operator.BuildContext, pluginID string) (Buffer, error)
}

// Build will build a buffer from the supplied configuration
func (config *Config) Build() (Buffer, error) {
switch config.BufferType {
case "memory", "":
return NewMemoryBuffer(config), nil
default:
return nil, errors.NewError(
fmt.Sprintf("Invalid buffer type %s", config.BufferType),
"The only supported buffer type is 'memory'",
)
}
func (bc *Config) UnmarshalJSON(data []byte) error {
return bc.unmarshal(func(dst interface{}) error {
return json.Unmarshal(data, dst)
})
}

// NewRetryConfig creates a new retry config
func NewRetryConfig() RetryConfig {
return RetryConfig{
InitialInterval: operator.Duration{Duration: 500 * time.Millisecond},
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: operator.Duration{Duration: 15 * time.Minute},
}
func (bc *Config) UnmarshalYAML(f func(interface{}) error) error {
return bc.unmarshal(f)
}

// RetryConfig is the configuration of an entity that will retry processing after an error
type RetryConfig struct {
InitialInterval operator.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"`
RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"`
Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"`
MaxInterval operator.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"`
MaxElapsedTime operator.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"`
func (bc *Config) unmarshal(unmarshal func(interface{}) error) error {
var typeStruct struct {
Type string
}
err := unmarshal(&typeStruct)
if err != nil {
return err
}
bc.Type = typeStruct.Type

switch bc.Type {
case "memory":
mbc := NewMemoryBufferConfig()
err := unmarshal(mbc)
if err != nil {
return err
}
bc.BufferBuilder = mbc
case "disk":
dbc := NewDiskBufferConfig()
err := unmarshal(dbc)
if err != nil {
return err
}
bc.BufferBuilder = dbc
default:
return fmt.Errorf("unknown buffer type '%s'", bc.Type)
}

return nil
}
112 changes: 25 additions & 87 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,109 +1,47 @@
package buffer

import (
"context"
"encoding/json"
"sync"
"testing"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
)

type bufferHandler struct {
flushed []*entry.Entry
mux sync.Mutex
notify chan struct{}
}

func (b *bufferHandler) ProcessMulti(ctx context.Context, entries []*entry.Entry) error {
b.mux.Lock()
b.flushed = append(b.flushed, entries...)
b.mux.Unlock()
b.notify <- struct{}{}
return nil
}

func (b *bufferHandler) Logger() *zap.SugaredLogger {
return nil
}

func TestBuffer(t *testing.T) {
config := NewConfig()
config.DelayThreshold = operator.Duration{
Duration: 100 * time.Millisecond,
}

buf := NewMemoryBuffer(&config)
numEntries := 10000

bh := bufferHandler{
flushed: make([]*entry.Entry, 0),
notify: make(chan struct{}),
}
buf.SetHandler(&bh)

for i := 0; i < numEntries; i++ {
err := buf.AddWait(context.Background(), entry.New(), 0)
require.NoError(t, err)
}

for {
select {
case <-bh.notify:
bh.mux.Lock()
if len(bh.flushed) == numEntries {
bh.mux.Unlock()
return
}
bh.mux.Unlock()
case <-time.After(time.Second):
require.FailNow(t, "timed out waiting for all entries to be flushed")
}
}
}

func TestBufferSerializationRoundtrip(t *testing.T) {
func TestBufferUnmarshalYAML(t *testing.T) {
cases := []struct {
name string
config Config
name string
input []byte
expected Config
}{
{
"zeros",
Config{},
"SimpleMemory",
[]byte("type: memory\nmax_entries: 30\n"),
Config{
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 30,
},
},
},
{
"defaults",
NewConfig(),
"SimpleDisk",
[]byte("type: disk\nmax_size: 1234\npath: /var/log/testpath\n"),
Config{
Type: "disk",
BufferBuilder: &DiskBufferConfig{
MaxSize: 1234,
Path: "/var/log/testpath",
},
},
},
}

for _, tc := range cases {
t.Run("yaml "+tc.name, func(t *testing.T) {
cfgBytes, err := yaml.Marshal(tc.config)
require.NoError(t, err)

var cfg Config
err = yaml.UnmarshalStrict(cfgBytes, &cfg)
require.NoError(t, err)

require.Equal(t, tc.config, cfg)
})

t.Run("json "+tc.name, func(t *testing.T) {
tc := tc
cfgBytes, err := json.Marshal(tc.config)
t.Run(tc.name, func(t *testing.T) {
var b Config
err := yaml.Unmarshal(tc.input, &b)
require.NoError(t, err)

var cfg Config
err = json.Unmarshal(cfgBytes, &cfg)
require.NoError(t, err)

require.Equal(t, tc.config, cfg)
require.Equal(t, tc.expected, b)
})
}
}
Loading

0 comments on commit e620fd2

Please sign in to comment.