Skip to content

Commit

Permalink
[processor/routing] Expand error handling on failure to build exporte…
Browse files Browse the repository at this point in the history
…rs (open-telemetry#8125)

* expand error handling on failure to build exporters

* update changelog

* add testcase scenario

* run gofmt and add shutdown

* fix imports
  • Loading branch information
jdgeisler authored and tomsanbear committed Mar 2, 2022
1 parent a690209 commit 35ec590
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- `internal/stanza`: Add support for arbitrary attribute types (#8081)
- `resourcedetectionprocessor`: Add confighttp.HTTPClientSettings To Resource Detection Config Fixes (#7397)
- `honeycombexporter`: Add validation for `sending_queue` setting (#8113)
- `routingprocessor`: Expand error handling on failure to build exporters (#8125)

### 🛑 Breaking changes 🛑

Expand Down
56 changes: 56 additions & 0 deletions processor/routingprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ package routingprocessor

import (
"context"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/service/servicetest"
"go.uber.org/zap"
)

func TestProcessorGetsCreatedWithValidConfiguration(t *testing.T) {
Expand Down Expand Up @@ -136,6 +142,56 @@ func TestShouldNotFailWhenNextIsProcessor(t *testing.T) {
assert.NotNil(t, exp)
}

func TestProcessorDoesNotFailToBuildExportersWithMultiplePipelines(t *testing.T) {
// prepare
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

processorFactory := NewFactory()
factories.Processors[typeStr] = processorFactory

otlpExporterFactory := otlpexporter.NewFactory()
factories.Exporters["otlp"] = otlpExporterFactory

otlpConfig := &otlpexporter.Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID("otlp")),
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "example.com:1234",
},
}

otlpTracesExporter, err := otlpExporterFactory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), otlpConfig)
require.NoError(t, err)

otlpMetricsExporter, err := otlpExporterFactory.CreateMetricsExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), otlpConfig)
require.NoError(t, err)

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp/traces"): otlpTracesExporter,
},
config.MetricsDataType: {
config.NewComponentID("otlp/metrics"): otlpMetricsExporter,
},
}
},
}

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config_multipipelines.yaml"), factories)
assert.NoError(t, err)

for _, cfg := range cfg.Processors {
exp := newProcessor(zap.NewNop(), cfg)
err = exp.Start(context.Background(), host)
// assert that no error is thrown due to multiple pipelines and exporters not using the routing processor
assert.NoError(t, err)
assert.NoError(t, exp.Shutdown(context.Background()))
}
}

func TestShutdown(t *testing.T) {
// prepare
factory := NewFactory()
Expand Down
4 changes: 2 additions & 2 deletions processor/routingprocessor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ func (r *router) registerExporters(hostExporters map[config.DataType]map[config.
},
} {
if err := reg.registerFunc(hostExporters[reg.typ]); err != nil {
if errors.Is(err, errDefaultExporterNotFound) {
r.logger.Warn("can't find the default exporter for the routing processor for this pipeline type. This is OK if you did not specify this processor for that pipeline type",
if errors.Is(err, errDefaultExporterNotFound) || errors.Is(err, errExporterNotFound) {
r.logger.Warn("can't find the exporter for the routing processor for this pipeline type. This is OK if you did not specify this processor for that pipeline type",
zap.Any("pipeline_type", reg.typ),
zap.Error(err),
)
Expand Down
31 changes: 31 additions & 0 deletions processor/routingprocessor/testdata/config_multipipelines.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
receivers:
nop:

processors:
routing:
attribute_source: resource
from_attribute: X-Tenant
table:
- value: acme
exporters:
- otlp/traces

exporters:
otlp/metrics:
otlp/traces:

service:
pipelines:
traces:
receivers:
- nop
processors:
- routing
exporters:
- otlp/traces
metrics:
receivers:
- nop
processors: []
exporters:
- otlp/metrics

0 comments on commit 35ec590

Please sign in to comment.