Skip to content

Commit

Permalink
Moved operator.Duration to helper.Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwilliams89 committed Sep 8, 2020
1 parent 98ae4cf commit e19f879
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 64 deletions.
34 changes: 17 additions & 17 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
)

// Buffer is an entity that buffers log entries to an operator
Expand All @@ -23,7 +23,7 @@ type Buffer interface {
func NewConfig() Config {
return Config{
BufferType: "memory",
DelayThreshold: operator.Duration{Duration: time.Second},
DelayThreshold: helper.Duration{Duration: time.Second},
BundleCountThreshold: 10_000,
BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB
BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB
Expand All @@ -35,14 +35,14 @@ func NewConfig() Config {

// Config is the configuration of a buffer
type Config struct {
BufferType string `json:"type,omitempty" yaml:"type,omitempty"`
DelayThreshold operator.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"`
BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"`
BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"`
BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"`
BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"`
HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"`
Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"`
BufferType string `json:"type,omitempty" yaml:"type,omitempty"`
DelayThreshold helper.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"`
BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"`
BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"`
BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"`
BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"`
HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"`
Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"`
}

// Build will build a buffer from the supplied configuration
Expand All @@ -61,18 +61,18 @@ func (config *Config) Build() (Buffer, error) {
// NewRetryConfig creates a new retry config
func NewRetryConfig() RetryConfig {
return RetryConfig{
InitialInterval: operator.Duration{Duration: 500 * time.Millisecond},
InitialInterval: helper.Duration{Duration: 500 * time.Millisecond},
RandomizationFactor: 0.5,
Multiplier: 1.5,
MaxInterval: operator.Duration{Duration: 15 * time.Minute},
MaxInterval: helper.Duration{Duration: 15 * time.Minute},
}
}

// RetryConfig is the configuration of an entity that will retry processing after an error
type RetryConfig struct {
InitialInterval operator.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"`
RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"`
Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"`
MaxInterval operator.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"`
MaxElapsedTime operator.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"`
InitialInterval helper.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"`
RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"`
Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"`
MaxInterval helper.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"`
MaxElapsedTime helper.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"`
}
4 changes: 2 additions & 2 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
Expand All @@ -34,7 +34,7 @@ func (b *bufferHandler) Logger() *zap.SugaredLogger {

func TestBuffer(t *testing.T) {
config := NewConfig()
config.DelayThreshold = operator.Duration{
config.DelayThreshold = helper.Duration{
Duration: 100 * time.Millisecond,
}

Expand Down
14 changes: 7 additions & 7 deletions operator/buffer/memory_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -47,7 +47,7 @@ func newMockHandler(t *testing.T) *mockHandler {
func TestMemoryBufferRetry(t *testing.T) {
t.Run("FailOnce", func(t *testing.T) {
cfg := NewConfig()
cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond}
cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond}
buffer, err := cfg.Build()
require.NoError(t, err)
handler := newMockHandler(t)
Expand All @@ -68,7 +68,7 @@ func TestMemoryBufferRetry(t *testing.T) {

t.Run("ContextCancelled", func(t *testing.T) {
cfg := NewConfig()
cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond}
cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond}
buffer, err := cfg.Build()
require.NoError(t, err)
handler := newMockHandler(t)
Expand All @@ -95,8 +95,8 @@ func TestMemoryBufferRetry(t *testing.T) {

t.Run("ExceededLimit", func(t *testing.T) {
cfg := NewConfig()
cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond}
cfg.Retry.MaxElapsedTime = operator.Duration{Duration: time.Nanosecond}
cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond}
cfg.Retry.MaxElapsedTime = helper.Duration{Duration: time.Nanosecond}
buffer, err := cfg.Build()
require.NoError(t, err)
handler := newMockHandler(t)
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestMemoryBufferRetry(t *testing.T) {
func TestMemoryBufferFlush(t *testing.T) {
t.Run("Simple", func(t *testing.T) {
cfg := NewConfig()
cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour}
cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour}
buffer, err := cfg.Build()
require.NoError(t, err)
handler := newMockHandler(t)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestMemoryBufferFlush(t *testing.T) {

t.Run("ContextCancelled", func(t *testing.T) {
cfg := NewConfig()
cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour}
cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour}
buffer, err := cfg.Build()
require.NoError(t, err)
handler := newMockHandler(t)
Expand Down
16 changes: 8 additions & 8 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func init() {
func NewInputConfig(operatorID string) *InputConfig {
return &InputConfig{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: operator.Duration{Duration: 200 * time.Millisecond},
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
StartAt: "end",
Expand All @@ -48,13 +48,13 @@ type InputConfig struct {
Include []string `json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
Expand Down
5 changes: 3 additions & 2 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -27,7 +28,7 @@ func newTestFileSource(t *testing.T) (*InputOperator, chan *entry.Entry) {
})

cfg := NewInputConfig("testfile")
cfg.PollInterval = operator.Duration{Duration: 50 * time.Millisecond}
cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond}
cfg.StartAt = "beginning"
cfg.Include = []string{"should-be-overwritten"}

Expand All @@ -50,7 +51,7 @@ func TestFileSource_Build(t *testing.T) {
cfg.OutputIDs = []string{"mock"}
cfg.Include = []string{"/var/log/testpath.*"}
cfg.Exclude = []string{"/var/log/testpath.ex*"}
cfg.PollInterval = operator.Duration{Duration: 10 * time.Millisecond}
cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond}
return cfg
}

Expand Down
12 changes: 6 additions & 6 deletions operator/builtin/input/windows/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func init() {
// EventLogConfig is the configuration of a windows event log operator.
type EventLogConfig struct {
helper.InputConfig `yaml:",inline"`
Channel string `json:"channel" yaml:"channel"`
MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Channel string `json:"channel" yaml:"channel"`
MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
}

// Build will build a windows event log operator.
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewDefaultConfig() operator.Builder {
InputConfig: helper.NewInputConfig("", "windows_eventlog_input"),
MaxReads: 100,
StartAt: "end",
PollInterval: operator.Duration{
PollInterval: helper.Duration{
Duration: 1 * time.Second,
},
}
Expand All @@ -79,7 +79,7 @@ type EventLogInput struct {
channel string
maxReads int
startAt string
pollInterval operator.Duration
pollInterval helper.Duration
offsets helper.Persister
cancel context.CancelFunc
wg *sync.WaitGroup
Expand Down
18 changes: 9 additions & 9 deletions operator/builtin/output/googlecloud/google_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig {
return &GoogleCloudOutputConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"),
BufferConfig: buffer.NewConfig(),
Timeout: operator.Duration{Duration: 30 * time.Second},
Timeout: helper.Duration{Duration: 30 * time.Second},
UseCompression: true,
}
}
Expand All @@ -43,14 +43,14 @@ type GoogleCloudOutputConfig struct {
helper.OutputConfig `yaml:",inline"`
BufferConfig buffer.Config `json:"buffer,omitempty" yaml:"buffer,omitempty"`

Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"`
CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"`
ProjectID string `json:"project_id" yaml:"project_id"`
LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"`
TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"`
SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"`
Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"`
Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"`
CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"`
ProjectID string `json:"project_id" yaml:"project_id"`
LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"`
TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"`
SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"`
Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"`
}

// Build will build a google cloud output operator.
Expand Down
4 changes: 2 additions & 2 deletions operator/builtin/output/googlecloud/google_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/golang/protobuf/ptypes"
tspb "github.com/golang/protobuf/ptypes/timestamp"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
Expand All @@ -33,7 +33,7 @@ type googleCloudTestCase struct {
func googleCloudBasicConfig() *GoogleCloudOutputConfig {
cfg := NewGoogleCloudOutputConfig("test_id")
cfg.ProjectID = "test_project_id"
cfg.BufferConfig.DelayThreshold = operator.Duration{Duration: time.Millisecond}
cfg.BufferConfig.DelayThreshold = helper.Duration{Duration: time.Millisecond}
return cfg
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ func NewK8sMetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi
TransformerConfig: helper.NewTransformerConfig(operatorID, "k8s_metadata_decorator"),
PodNameField: entry.NewResourceField("k8s.pod.name"),
NamespaceField: entry.NewResourceField("k8s.namespace.name"),
CacheTTL: operator.Duration{Duration: 10 * time.Minute},
Timeout: operator.Duration{Duration: 10 * time.Second},
CacheTTL: helper.Duration{Duration: 10 * time.Minute},
Timeout: helper.Duration{Duration: 10 * time.Second},
}
}

// K8sMetadataDecoratorConfig is the configuration of k8s_metadata_decorator operator
type K8sMetadataDecoratorConfig struct {
helper.TransformerConfig `yaml:",inline"`
PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"`
NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"`
CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"`
Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"`
NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"`
CacheTTL helper.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"`
Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}

// Build will build a k8s_metadata_decorator operator from the supplied configuration
Expand Down
6 changes: 3 additions & 3 deletions operator/builtin/transformer/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func NewRateLimitConfig(operatorID string) *RateLimitConfig {
type RateLimitConfig struct {
helper.TransformerConfig `yaml:",inline"`

Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"`
Interval operator.Duration `json:"interval,omitempty" yaml:"interval,omitempty"`
Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"`
Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"`
Interval helper.Duration `json:"interval,omitempty" yaml:"interval,omitempty"`
Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"`
}

// Build will build a rate limit operator.
Expand Down
2 changes: 1 addition & 1 deletion operator/duration.go → operator/helper/duration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package operator
package helper

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package operator
package helper

import (
"encoding/json"
Expand Down

0 comments on commit e19f879

Please sign in to comment.