Skip to content

Commit

Permalink
Add factory and new-style config for Queued processor (#47)
Browse files Browse the repository at this point in the history
This is part of remaining migration to new configuration format.

Github issue: #46

Testing done: make
  • Loading branch information
tigrannajaryan authored and Paulo Janotti committed Jun 25, 2019
1 parent 6566e78 commit 38f1390
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cmd/occollector/app/collector/testdata/otelsvc-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ exporters:
processors:
attributes:
enabled: true
queued-retry:
enabled: true

pipelines:
traces:
receivers: [jaeger]
processors: [attributes]
processors: [attributes, queued-retry]
exporters: [opencensus]
35 changes: 35 additions & 0 deletions internal/collector/processor/queued/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queued

import (
"time"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
)

// ConfigV2 defines configuration for Attributes processor.
type ConfigV2 struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// NumWorkers is the number of queue workers that dequeue batches and send them out.
NumWorkers int `mapstructure:"num-workers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue-size"`
// Retry indicates whether queue processor should retry span batches in case of processing failure.
RetryOnFailure bool `mapstructure:"retry-on-failure"`
// BackoffDelay is the amount of time a worker waits after a failed send before retrying.
BackoffDelay time.Duration `mapstructure:"backoff-delay"`
}
55 changes: 55 additions & 0 deletions internal/collector/processor/queued/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queued

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/configv2"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

var _ = configv2.RegisterTestFactories()

func TestLoadConfig(t *testing.T) {

factory := factories.GetProcessorFactory(typeStr)

config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"))

require.Nil(t, err)
require.NotNil(t, config)

p0 := config.Processors["queued-retry"]
assert.Equal(t, p0, factory.CreateDefaultConfig())

p1 := config.Processors["queued-retry/2"]
assert.Equal(t, p1,
&ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "queued-retry",
},
NumWorkers: 2,
QueueSize: 10,
RetryOnFailure: true,
BackoffDelay: time.Second * 5,
})
}
75 changes: 75 additions & 0 deletions internal/collector/processor/queued/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queued

import (
"time"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
)

var _ = factories.RegisterProcessorFactory(&processorFactory{})

const (
// The value of "type" key in configuration.
typeStr = "queued-retry"
)

// processorFactory is the factory for OpenCensus exporter.
type processorFactory struct {
}

// Type gets the type of the Option config created by this factory.
func (f *processorFactory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for exporter.
func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
return &ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
},
NumWorkers: 10,
QueueSize: 5000,
RetryOnFailure: true,
BackoffDelay: time.Second * 5,
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *processorFactory) CreateTraceProcessor(
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
oCfg := cfg.(*ConfigV2)
return NewQueuedSpanProcessor(nextConsumer,
Options.WithNumWorkers(oCfg.NumWorkers),
Options.WithQueueSize(oCfg.QueueSize),
Options.WithRetryOnProcessingFailures(oCfg.RetryOnFailure),
Options.WithBackoffDelay(oCfg.BackoffDelay),
), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *processorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
}
47 changes: 47 additions & 0 deletions internal/collector/processor/queued/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queued

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
}

func TestCreateProcessor(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(nil, cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
19 changes: 19 additions & 0 deletions internal/collector/processor/queued/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
receivers:
examplereceiver:

processors:
queued-retry:
queued-retry/2:
num-workers: 2
queue-size: 10
retry-on-failure: true
backoff-delay: 5s

exporters:
exampleexporter:

pipelines:
traces:
receivers: [examplereceiver]
processors: [queued-retry/2]
exporters: [exampleexporter]

0 comments on commit 38f1390

Please sign in to comment.