diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index d4303c0fc..6f3429d98 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -7,7 +7,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" ) // Buffer is an entity that buffers log entries to an operator @@ -23,7 +23,7 @@ type Buffer interface { func NewConfig() Config { return Config{ BufferType: "memory", - DelayThreshold: operator.Duration{Duration: time.Second}, + DelayThreshold: helper.Duration{Duration: time.Second}, BundleCountThreshold: 10_000, BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB @@ -35,14 +35,14 @@ func NewConfig() Config { // Config is the configuration of a buffer type Config struct { - BufferType string `json:"type,omitempty" yaml:"type,omitempty"` - DelayThreshold operator.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"` - BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"` - BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"` - BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"` - BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"` - HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"` - Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` + BufferType string `json:"type,omitempty" yaml:"type,omitempty"` + DelayThreshold helper.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"` + BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"` + BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"` + BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"` + BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"` + HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"` + Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` } // Build will build a buffer from the supplied configuration @@ -61,18 +61,18 @@ func (config *Config) Build() (Buffer, error) { // NewRetryConfig creates a new retry config func NewRetryConfig() RetryConfig { return RetryConfig{ - InitialInterval: operator.Duration{Duration: 500 * time.Millisecond}, + InitialInterval: helper.Duration{Duration: 500 * time.Millisecond}, RandomizationFactor: 0.5, Multiplier: 1.5, - MaxInterval: operator.Duration{Duration: 15 * time.Minute}, + MaxInterval: helper.Duration{Duration: 15 * time.Minute}, } } // RetryConfig is the configuration of an entity that will retry processing after an error type RetryConfig struct { - InitialInterval operator.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"` - RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"` - Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"` - MaxInterval operator.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"` - MaxElapsedTime operator.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"` + InitialInterval helper.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"` + RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"` + Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"` + MaxInterval helper.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"` + MaxElapsedTime helper.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"` } diff --git a/operator/buffer/buffer_test.go b/operator/buffer/buffer_test.go index eb8028888..469001637 100644 --- a/operator/buffer/buffer_test.go +++ b/operator/buffer/buffer_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.uber.org/zap" yaml "gopkg.in/yaml.v2" @@ -34,7 +34,7 @@ func (b *bufferHandler) Logger() *zap.SugaredLogger { func TestBuffer(t *testing.T) { config := NewConfig() - config.DelayThreshold = operator.Duration{ + config.DelayThreshold = helper.Duration{ Duration: 100 * time.Millisecond, } diff --git a/operator/buffer/memory_buffer_test.go b/operator/buffer/memory_buffer_test.go index 5f4194e63..2b3df97f3 100644 --- a/operator/buffer/memory_buffer_test.go +++ b/operator/buffer/memory_buffer_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -47,7 +47,7 @@ func newMockHandler(t *testing.T) *mockHandler { func TestMemoryBufferRetry(t *testing.T) { t.Run("FailOnce", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -68,7 +68,7 @@ func TestMemoryBufferRetry(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -95,8 +95,8 @@ func TestMemoryBufferRetry(t *testing.T) { t.Run("ExceededLimit", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} - cfg.Retry.MaxElapsedTime = operator.Duration{Duration: time.Nanosecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} + cfg.Retry.MaxElapsedTime = helper.Duration{Duration: time.Nanosecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -124,7 +124,7 @@ func TestMemoryBufferRetry(t *testing.T) { func TestMemoryBufferFlush(t *testing.T) { t.Run("Simple", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -156,7 +156,7 @@ func TestMemoryBufferFlush(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b0e94de2d..75ddb00dc 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -32,7 +32,7 @@ func init() { func NewInputConfig(operatorID string) *InputConfig { return &InputConfig{ InputConfig: helper.NewInputConfig(operatorID, "file_input"), - PollInterval: operator.Duration{Duration: 200 * time.Millisecond}, + PollInterval: helper.Duration{Duration: 200 * time.Millisecond}, IncludeFileName: true, IncludeFilePath: false, StartAt: "end", @@ -48,13 +48,13 @@ type InputConfig struct { Include []string `json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` } // MultilineConfig is the configuration a multiline operation diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 6bac6c827..0ccdcd4fd 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -14,6 +14,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -27,7 +28,7 @@ func newTestFileSource(t *testing.T) (*InputOperator, chan *entry.Entry) { }) cfg := NewInputConfig("testfile") - cfg.PollInterval = operator.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{"should-be-overwritten"} @@ -50,7 +51,7 @@ func TestFileSource_Build(t *testing.T) { cfg.OutputIDs = []string{"mock"} cfg.Include = []string{"/var/log/testpath.*"} cfg.Exclude = []string{"/var/log/testpath.ex*"} - cfg.PollInterval = operator.Duration{Duration: 10 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} return cfg } diff --git a/operator/builtin/input/windows/operator.go b/operator/builtin/input/windows/operator.go index 69926f22a..388920d5f 100644 --- a/operator/builtin/input/windows/operator.go +++ b/operator/builtin/input/windows/operator.go @@ -19,10 +19,10 @@ func init() { // EventLogConfig is the configuration of a windows event log operator. type EventLogConfig struct { helper.InputConfig `yaml:",inline"` - Channel string `json:"channel" yaml:"channel"` - MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Channel string `json:"channel" yaml:"channel"` + MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` } // Build will build a windows event log operator. @@ -64,7 +64,7 @@ func NewDefaultConfig() operator.Builder { InputConfig: helper.NewInputConfig("", "windows_eventlog_input"), MaxReads: 100, StartAt: "end", - PollInterval: operator.Duration{ + PollInterval: helper.Duration{ Duration: 1 * time.Second, }, } @@ -79,7 +79,7 @@ type EventLogInput struct { channel string maxReads int startAt string - pollInterval operator.Duration + pollInterval helper.Duration offsets helper.Persister cancel context.CancelFunc wg *sync.WaitGroup diff --git a/operator/builtin/output/googlecloud/google_cloud.go b/operator/builtin/output/googlecloud/google_cloud.go index 7751a7ab9..ea6188f5c 100644 --- a/operator/builtin/output/googlecloud/google_cloud.go +++ b/operator/builtin/output/googlecloud/google_cloud.go @@ -33,7 +33,7 @@ func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig { return &GoogleCloudOutputConfig{ OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"), BufferConfig: buffer.NewConfig(), - Timeout: operator.Duration{Duration: 30 * time.Second}, + Timeout: helper.Duration{Duration: 30 * time.Second}, UseCompression: true, } } @@ -43,14 +43,14 @@ type GoogleCloudOutputConfig struct { helper.OutputConfig `yaml:",inline"` BufferConfig buffer.Config `json:"buffer,omitempty" yaml:"buffer,omitempty"` - Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` - CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` - ProjectID string `json:"project_id" yaml:"project_id"` - LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` - TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` - SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` - UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` + Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` + CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` + ProjectID string `json:"project_id" yaml:"project_id"` + LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` + TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` + SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` } // Build will build a google cloud output operator. diff --git a/operator/builtin/output/googlecloud/google_cloud_test.go b/operator/builtin/output/googlecloud/google_cloud_test.go index b2bdb6422..a6c947fc8 100644 --- a/operator/builtin/output/googlecloud/google_cloud_test.go +++ b/operator/builtin/output/googlecloud/google_cloud_test.go @@ -13,7 +13,7 @@ import ( "github.com/golang/protobuf/ptypes" tspb "github.com/golang/protobuf/ptypes/timestamp" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/require" "google.golang.org/api/option" @@ -33,7 +33,7 @@ type googleCloudTestCase struct { func googleCloudBasicConfig() *GoogleCloudOutputConfig { cfg := NewGoogleCloudOutputConfig("test_id") cfg.ProjectID = "test_project_id" - cfg.BufferConfig.DelayThreshold = operator.Duration{Duration: time.Millisecond} + cfg.BufferConfig.DelayThreshold = helper.Duration{Duration: time.Millisecond} return cfg } diff --git a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go index 189220afc..76194fd31 100644 --- a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go +++ b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go @@ -24,18 +24,18 @@ func NewK8sMetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi TransformerConfig: helper.NewTransformerConfig(operatorID, "k8s_metadata_decorator"), PodNameField: entry.NewResourceField("k8s.pod.name"), NamespaceField: entry.NewResourceField("k8s.namespace.name"), - CacheTTL: operator.Duration{Duration: 10 * time.Minute}, - Timeout: operator.Duration{Duration: 10 * time.Second}, + CacheTTL: helper.Duration{Duration: 10 * time.Minute}, + Timeout: helper.Duration{Duration: 10 * time.Second}, } } // K8sMetadataDecoratorConfig is the configuration of k8s_metadata_decorator operator type K8sMetadataDecoratorConfig struct { helper.TransformerConfig `yaml:",inline"` - PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` - NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` - CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` + NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` + CacheTTL helper.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` } // Build will build a k8s_metadata_decorator operator from the supplied configuration diff --git a/operator/builtin/transformer/ratelimit/rate_limit.go b/operator/builtin/transformer/ratelimit/rate_limit.go index 42056096e..992da702e 100644 --- a/operator/builtin/transformer/ratelimit/rate_limit.go +++ b/operator/builtin/transformer/ratelimit/rate_limit.go @@ -25,9 +25,9 @@ func NewRateLimitConfig(operatorID string) *RateLimitConfig { type RateLimitConfig struct { helper.TransformerConfig `yaml:",inline"` - Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` - Interval operator.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` - Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` + Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` + Interval helper.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` + Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` } // Build will build a rate limit operator. diff --git a/operator/duration.go b/operator/helper/duration.go similarity index 98% rename from operator/duration.go rename to operator/helper/duration.go index e53d8dbbc..3cd5dd652 100644 --- a/operator/duration.go +++ b/operator/helper/duration.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json" diff --git a/operator/duration_test.go b/operator/helper/duration_test.go similarity index 98% rename from operator/duration_test.go rename to operator/helper/duration_test.go index 8c5c85d3d..9d2461283 100644 --- a/operator/duration_test.go +++ b/operator/helper/duration_test.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json"