Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submodules #108

Merged
merged 97 commits into from
Sep 11, 2020
Merged
Changes from 31 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
ca5a6cf
Renamed package to stanza
djaglowski Aug 24, 2020
bbb31de
Merge branch 'master' into stanza
djaglowski Aug 24, 2020
299e800
Updated new operator package
djaglowski Aug 24, 2020
70b145b
Updated changelog ahead of release
djaglowski Aug 24, 2020
44c9adb
Submodule exploration
camdencheek Aug 27, 2020
1dfcc6e
Finished preliminary reorg of operators into modules
djaglowski Sep 1, 2020
46841b9
Merged master
djaglowski Sep 1, 2020
c26f895
Fixed google cloud test that was failing on infinite loop
djaglowski Sep 1, 2020
af46c61
Fix all tests
djaglowski Sep 1, 2020
f51025f
Merged master
djaglowski Sep 1, 2020
420acc7
Update changelog
djaglowski Sep 1, 2020
0fb2bfd
Fix CI build
djaglowski Sep 1, 2020
25b9986
Rename init.go to init_common.go, so that it will not be ignored by a…
djaglowski Sep 1, 2020
0a75e8f
WIP - try fixing tests on windows CI
djaglowski Sep 1, 2020
8567b61
WIP - Try another windows testing approach
djaglowski Sep 1, 2020
32ffdc7
Try uploading joined coverage file
djaglowski Sep 1, 2020
076c485
Run all unit tests for windows, and report all coverage
djaglowski Sep 1, 2020
aa15f29
Fix return paths on windows ci tests
djaglowski Sep 1, 2020
e0a3134
Try more powershelly command
djaglowski Sep 1, 2020
0cd0842
Clean up makefile and try codecov upload with glob
djaglowski Sep 1, 2020
980cfc3
WIP
camdencheek Aug 11, 2020
1428a52
WIP
camdencheek Aug 12, 2020
14c1b3d
Make disk buffer work
camdencheek Aug 21, 2020
d5019df
Add ReadWait
camdencheek Aug 21, 2020
56a019e
Make benchmark for disk buffer
camdencheek Aug 21, 2020
560bb1f
WIP
camdencheek Aug 24, 2020
6e1c1a1
WIP
camdencheek Aug 25, 2020
7fe5b11
WORKING
camdencheek Aug 25, 2020
1a90c8d
Remove debug printlns
camdencheek Aug 25, 2020
9286e20
WIP broken
camdencheek Aug 26, 2020
fcb7c7c
Rename to stanza
camdencheek Aug 27, 2020
2a43d87
WIP
camdencheek Aug 27, 2020
711555a
WIP
camdencheek Aug 28, 2020
e6ec767
WORKING
camdencheek Aug 28, 2020
364fabe
Remove unnecessary counter
camdencheek Aug 28, 2020
9da4a59
Clean up unused
camdencheek Aug 28, 2020
0f8ccd0
Start of memory buffer
camdencheek Aug 31, 2020
e957920
Add slow memory buffer implementation
camdencheek Aug 31, 2020
b425ea5
Reorganize package
camdencheek Aug 31, 2020
d54d251
Update comments
camdencheek Aug 31, 2020
c3f9c28
WIP
camdencheek Aug 31, 2020
f9c001e
Improve performance by only seeking when necessary
camdencheek Aug 31, 2020
ebe0ad0
WIP
camdencheek Aug 31, 2020
24308f1
WIP
camdencheek Sep 1, 2020
d188aab
WIP
camdencheek Sep 1, 2020
fa51beb
Fix failure to release semaphore
camdencheek Sep 1, 2020
e0c2b94
Some code hygiene
camdencheek Sep 1, 2020
6a2a16d
Add some small tests
camdencheek Sep 1, 2020
71f9159
Make NewConfig return a pointer
camdencheek Sep 1, 2020
0961983
Fix tests
camdencheek Sep 1, 2020
8b75cd5
Tidy
djaglowski Sep 1, 2020
7a52d76
Fix make tidy target
djaglowski Sep 2, 2020
e620fd2
Merged in disk-buffers
djaglowski Sep 3, 2020
ee559ea
Fix tests and integrate with Google Cloud
camdencheek Sep 3, 2020
a04e59b
Fix remaining tests
camdencheek Sep 3, 2020
a796934
Appease linter
camdencheek Sep 3, 2020
51d296b
Merge remote-tracking branch 'origin/disk-buffer' into submod-diskbuff
djaglowski Sep 3, 2020
747ac8e
Merged disk buffers again
djaglowski Sep 3, 2020
a75331f
Add comments to public functions
camdencheek Sep 3, 2020
1e8d25a
Add test for closing and reopening
camdencheek Sep 3, 2020
a2c603e
Add comments to flusher
camdencheek Sep 3, 2020
3bdb9d8
Remove TODO
camdencheek Sep 3, 2020
f0051fd
Update diskSizeSemaphore comment
camdencheek Sep 3, 2020
17ce9ab
Update changelog
camdencheek Sep 3, 2020
f184e0d
Fis issue with creating files in the buffer package
camdencheek Sep 3, 2020
dd952bf
Merged disk-buffer again
djaglowski Sep 3, 2020
5b6b775
Add disk buffers
camdencheek Sep 3, 2020
59b2bb6
Tidy dependencies
camdencheek Sep 3, 2020
2da5b7a
Merged disk-buffer again
djaglowski Sep 3, 2020
18add81
Fix all existing tests
camdencheek Sep 7, 2020
dd629c1
Move pollForNewFiles into its own method
camdencheek Sep 8, 2020
e7934c6
Deduplicate NewFileReader
camdencheek Sep 8, 2020
580784c
Fix race condition
camdencheek Sep 8, 2020
f8965c2
Resolved merge conflicts
djaglowski Sep 8, 2020
8c53231
Update tests
camdencheek Sep 8, 2020
e8ca3a8
Fix some lints
camdencheek Sep 8, 2020
48bbe84
Minor fixes
camdencheek Sep 8, 2020
174c138
Close file during move on Windows
camdencheek Sep 8, 2020
8e927d4
Merge branch 'submod-diskbuff' into rc-0.10.0
djaglowski Sep 8, 2020
1e5d36c
Fixed failing test
djaglowski Sep 8, 2020
5a26d73
Added comments, cleaned up stutter
djaglowski Sep 8, 2020
8156ab6
Added multi file test
djaglowski Sep 8, 2020
60b8b03
Improved coverage (#110)
jmwilliams89 Sep 9, 2020
53457f2
Added file rotation test
djaglowski Sep 9, 2020
ff56186
Merged master
djaglowski Sep 9, 2020
a20666b
Merged in master, and improved test coverage
djaglowski Sep 9, 2020
99f3ed5
Merged in submodules
djaglowski Sep 9, 2020
6276c2f
Add LastSeenTime to readers
camdencheek Sep 9, 2020
07dc982
Fix data race
camdencheek Sep 9, 2020
6c8f2d4
Synchronize reading to simplify logic
camdencheek Sep 9, 2020
e4ed15e
Update fingerprint on initialize
camdencheek Sep 9, 2020
dbb2eb1
Add comments
camdencheek Sep 9, 2020
3f6a663
Remove unnecessary setOffset function
camdencheek Sep 9, 2020
a0422ff
Update fingerprint on truncate
camdencheek Sep 9, 2020
82d351e
Add a few tests
camdencheek Sep 9, 2020
49d23ab
Ignore empty lines
camdencheek Sep 9, 2020
9bc0838
File fixes (#113)
camdencheek Sep 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -4,13 +4,18 @@ go 1.14

require (
github.com/antonmedv/expr v1.8.8
github.com/cenkalti/backoff/v4 v4.0.2
github.com/observiq/ctimefmt v1.0.0
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
go.etcd.io/bbolt v1.3.5
go.uber.org/zap v1.15.0
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
golang.org/x/tools v0.0.0-20200828161849-5deb26317202 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gonum.org/v1/gonum v0.8.1
google.golang.org/api v0.31.0
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
303 changes: 1 addition & 302 deletions go.sum

Large diffs are not rendered by default.

104 changes: 52 additions & 52 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -2,77 +2,77 @@ package buffer

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
)

// Buffer is an entity that buffers log entries to an operator
type Buffer interface {
Flush(context.Context) error
Add(interface{}, int) error
AddWait(context.Context, interface{}, int) error
SetHandler(BundleHandler)
Process(context.Context, *entry.Entry) error
Add(context.Context, *entry.Entry) error
Read([]*entry.Entry) (func(), int, error)
ReadWait([]*entry.Entry, <-chan time.Time) (func(), int, error)
Close() error
}

type Config struct {
Type string `json:"type" yaml:"type"`
BufferBuilder
}

// NewConfig creates a new buffer config
func NewConfig() Config {
return Config{
BufferType: "memory",
DelayThreshold: operator.Duration{Duration: time.Second},
BundleCountThreshold: 10_000,
BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB
BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB
BufferedByteLimit: 500 * 1024 * 1024 * 1024, // 500MB
HandlerLimit: 16,
Retry: NewRetryConfig(),
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 1 << 20,
},
}
}

// Config is the configuration of a buffer
type Config struct {
BufferType string `json:"type,omitempty" yaml:"type,omitempty"`
DelayThreshold operator.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"`
BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"`
BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"`
BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"`
BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"`
HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"`
Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"`
type BufferBuilder interface {
Build(context operator.BuildContext, pluginID string) (Buffer, error)
}

// Build will build a buffer from the supplied configuration
func (config *Config) Build() (Buffer, error) {
switch config.BufferType {
case "memory", "":
return NewMemoryBuffer(config), nil
default:
return nil, errors.NewError(
fmt.Sprintf("Invalid buffer type %s", config.BufferType),
"The only supported buffer type is 'memory'",
)
}
func (bc *Config) UnmarshalJSON(data []byte) error {
return bc.unmarshal(func(dst interface{}) error {
return json.Unmarshal(data, dst)
})
}

// NewRetryConfig creates a new retry config
func NewRetryConfig() RetryConfig {
return RetryConfig{
InitialInterval: operator.Duration{Duration: 500 * time.Millisecond},
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: operator.Duration{Duration: 15 * time.Minute},
}
func (bc *Config) UnmarshalYAML(f func(interface{}) error) error {
return bc.unmarshal(f)
}

// RetryConfig is the configuration of an entity that will retry processing after an error
type RetryConfig struct {
InitialInterval operator.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"`
RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"`
Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"`
MaxInterval operator.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"`
MaxElapsedTime operator.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"`
func (bc *Config) unmarshal(unmarshal func(interface{}) error) error {
var typeStruct struct {
Type string
}
err := unmarshal(&typeStruct)
if err != nil {
return err
}
bc.Type = typeStruct.Type

switch bc.Type {
case "memory":
mbc := NewMemoryBufferConfig()
err := unmarshal(mbc)
if err != nil {
return err
}
bc.BufferBuilder = mbc
case "disk":
dbc := NewDiskBufferConfig()
err := unmarshal(dbc)
if err != nil {
return err
}
bc.BufferBuilder = dbc
default:
return fmt.Errorf("unknown buffer type '%s'", bc.Type)
}

return nil
}
112 changes: 25 additions & 87 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -1,109 +1,47 @@
package buffer

import (
"context"
"encoding/json"
"sync"
"testing"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
)

type bufferHandler struct {
flushed []*entry.Entry
mux sync.Mutex
notify chan struct{}
}

func (b *bufferHandler) ProcessMulti(ctx context.Context, entries []*entry.Entry) error {
b.mux.Lock()
b.flushed = append(b.flushed, entries...)
b.mux.Unlock()
b.notify <- struct{}{}
return nil
}

func (b *bufferHandler) Logger() *zap.SugaredLogger {
return nil
}

func TestBuffer(t *testing.T) {
config := NewConfig()
config.DelayThreshold = operator.Duration{
Duration: 100 * time.Millisecond,
}

buf := NewMemoryBuffer(&config)
numEntries := 10000

bh := bufferHandler{
flushed: make([]*entry.Entry, 0),
notify: make(chan struct{}),
}
buf.SetHandler(&bh)

for i := 0; i < numEntries; i++ {
err := buf.AddWait(context.Background(), entry.New(), 0)
require.NoError(t, err)
}

for {
select {
case <-bh.notify:
bh.mux.Lock()
if len(bh.flushed) == numEntries {
bh.mux.Unlock()
return
}
bh.mux.Unlock()
case <-time.After(time.Second):
require.FailNow(t, "timed out waiting for all entries to be flushed")
}
}
}

func TestBufferSerializationRoundtrip(t *testing.T) {
func TestBufferUnmarshalYAML(t *testing.T) {
cases := []struct {
name string
config Config
name string
input []byte
expected Config
}{
{
"zeros",
Config{},
"SimpleMemory",
[]byte("type: memory\nmax_entries: 30\n"),
Config{
Type: "memory",
BufferBuilder: &MemoryBufferConfig{
MaxEntries: 30,
},
},
},
{
"defaults",
NewConfig(),
"SimpleDisk",
[]byte("type: disk\nmax_size: 1234\npath: /var/log/testpath\n"),
Config{
Type: "disk",
BufferBuilder: &DiskBufferConfig{
MaxSize: 1234,
Path: "/var/log/testpath",
},
},
},
}

for _, tc := range cases {
t.Run("yaml "+tc.name, func(t *testing.T) {
cfgBytes, err := yaml.Marshal(tc.config)
require.NoError(t, err)

var cfg Config
err = yaml.UnmarshalStrict(cfgBytes, &cfg)
require.NoError(t, err)

require.Equal(t, tc.config, cfg)
})

t.Run("json "+tc.name, func(t *testing.T) {
tc := tc
cfgBytes, err := json.Marshal(tc.config)
t.Run(tc.name, func(t *testing.T) {
var b Config
err := yaml.Unmarshal(tc.input, &b)
require.NoError(t, err)

var cfg Config
err = json.Unmarshal(cfgBytes, &cfg)
require.NoError(t, err)

require.Equal(t, tc.config, cfg)
require.Equal(t, tc.expected, b)
})
}
}
Loading