Skip to content

Commit

Permalink
Add tests for the new pipelines builder (#5544)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 20, 2022
1 parent e734550 commit 35b844e
Show file tree
Hide file tree
Showing 30 changed files with 585 additions and 170 deletions.
31 changes: 0 additions & 31 deletions service/internal/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package builder

import (
"context"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -26,7 +25,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/internal/testcomponents"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestBuildExporters(t *testing.T) {
Expand Down Expand Up @@ -114,32 +112,3 @@ func TestBuildExportersStartStopAll(t *testing.T) {
assert.True(t, metricExporter.Stopped)
assert.True(t, logsExporter.Stopped)
}

func TestBuildExportersNotSupportedDataType(t *testing.T) {
factories := createTestFactories()

tests := []struct {
configFile string
}{
{
configFile: "not_supported_exporter_logs.yaml",
},
{
configFile: "not_supported_exporter_metrics.yaml",
},
{
configFile: "not_supported_exporter_traces.yaml",
},
}

for _, test := range tests {
t.Run(test.configFile, func(t *testing.T) {

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories)
require.Nil(t, err)

_, err = BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters)
assert.Error(t, err)
})
}
}
10 changes: 5 additions & 5 deletions service/internal/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type builtPipeline struct {
// can mutate the TraceData or MetricsData input argument.
MutatesData bool

processors []component.Processor
Processors []component.Processor
}

// BuiltPipelines is a map of build pipelines created from pipeline configs.
Expand All @@ -57,8 +57,8 @@ func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Ho
// This is important so that processors that are earlier in the pipeline and
// reference processors that are later in the pipeline do not start sending
// data to later pipelines which are not yet started.
for i := len(bp.processors) - 1; i >= 0; i-- {
if err := bp.processors[i].Start(ctx, hostWrapper); err != nil {
for i := len(bp.Processors) - 1; i >= 0; i-- {
if err := bp.Processors[i].Start(ctx, hostWrapper); err != nil {
return err
}
}
Expand All @@ -71,7 +71,7 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error {
var errs error
for _, bp := range bps {
bp.logger.Info("Pipeline is shutting down...")
for _, p := range bp.processors {
for _, p := range bp.Processors {
errs = multierr.Append(errs, p.Shutdown(ctx))
}
bp.logger.Info("Pipeline is shutdown.")
Expand Down Expand Up @@ -243,7 +243,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config
firstLC: lc,
Config: pipelineCfg,
MutatesData: mutatesConsumedData,
processors: processors,
Processors: processors,
}

return bp, nil
Expand Down
33 changes: 0 additions & 33 deletions service/internal/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,36 +228,3 @@ func testPipeline(t *testing.T, pipelineID config.ComponentID, exporterIDs []con
err = pipelineProcessors.ShutdownProcessors(context.Background())
assert.NoError(t, err)
}

func TestBuildPipelines_NotSupportedDataType(t *testing.T) {
factories := createTestFactories()

tests := []struct {
configFile string
}{
{
configFile: "not_supported_processor_logs.yaml",
},
{
configFile: "not_supported_processor_metrics.yaml",
},
{
configFile: "not_supported_processor_traces.yaml",
},
}

for _, test := range tests {
t.Run(test.configFile, func(t *testing.T) {

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories)
require.Nil(t, err)

allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters)
assert.NoError(t, err)

pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))
})
}
}
14 changes: 7 additions & 7 deletions service/internal/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ var errUnusedReceiver = errors.New("receiver defined but not used by any pipelin
// a trace and/or a metrics component.
type builtReceiver struct {
logger *zap.Logger
receiver component.Receiver
Receiver component.Receiver
}

// Start starts the receiver.
func (rcv *builtReceiver) Start(ctx context.Context, host component.Host) error {
return rcv.receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger))
return rcv.Receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger))
}

// Shutdown stops the receiver.
func (rcv *builtReceiver) Shutdown(ctx context.Context) error {
return rcv.receiver.Shutdown(ctx)
return rcv.Receiver.Shutdown(ctx)
}

// Receivers is a map of receivers created from receiver configs.
Expand Down Expand Up @@ -207,11 +207,11 @@ func attachReceiverToPipelines(
return fmt.Errorf("factory for %v produced a nil receiver", id)
}

if rcv.receiver != nil {
if rcv.Receiver != nil {
// The receiver was previously created for this config. This can happen if the
// same receiver type supports more than one data type. In that case we expect
// that CreateTracesReceiver and CreateMetricsReceiver return the same value.
if rcv.receiver != createdReceiver {
if rcv.Receiver != createdReceiver {
return fmt.Errorf(
"factory for %q is implemented incorrectly: "+
"CreateTracesReceiver, CreateMetricsReceiver and CreateLogsReceiver must return "+
Expand All @@ -220,7 +220,7 @@ func attachReceiverToPipelines(
)
}
}
rcv.receiver = createdReceiver
rcv.Receiver = createdReceiver

set.Logger.Info("Receiver was built.", zap.String("datatype", string(dataType)))

Expand Down Expand Up @@ -258,7 +258,7 @@ func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.Rec
}
}

if rcv.receiver == nil {
if rcv.Receiver == nil {
return nil, errUnusedReceiver
}

Expand Down
49 changes: 6 additions & 43 deletions service/internal/builder/receivers_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testReceivers(t *testing.T, test testCase) {
// Ensure receiver has its fields correctly populated.
require.NotNil(t, receiver)

assert.NotNil(t, receiver.receiver)
assert.NotNil(t, receiver.Receiver)

// Compose the list of created exporters.
var exporters []component.Exporter
Expand All @@ -130,13 +130,13 @@ func testReceivers(t *testing.T, test testCase) {

td := testdata.GenerateTracesOneSpan()
if test.hasTraces {
traceProducer := receiver.receiver.(*testcomponents.ExampleReceiver)
traceProducer := receiver.Receiver.(*testcomponents.ExampleReceiver)
assert.NoError(t, traceProducer.ConsumeTraces(context.Background(), td))
}

md := testdata.GenerateMetricsOneMetric()
if test.hasMetrics {
metricsProducer := receiver.receiver.(*testcomponents.ExampleReceiver)
metricsProducer := receiver.Receiver.(*testcomponents.ExampleReceiver)
assert.NoError(t, metricsProducer.ConsumeMetrics(context.Background(), md))
}

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestBuildReceiversBuildCustom(t *testing.T) {
// Ensure receiver has its fields correctly populated.
require.NotNil(t, receiver)

assert.NotNil(t, receiver.receiver)
assert.NotNil(t, receiver.Receiver)

// Compose the list of created exporters.
exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")}
Expand All @@ -231,7 +231,7 @@ func TestBuildReceiversBuildCustom(t *testing.T) {

// Send one data.
log := plog.Logs{}
producer := receiver.receiver.(*testcomponents.ExampleReceiver)
producer := receiver.Receiver.(*testcomponents.ExampleReceiver)
require.NoError(t, producer.ConsumeLogs(context.Background(), log))

// Now verify received data.
Expand All @@ -251,7 +251,7 @@ func TestBuildReceivers_StartAll(t *testing.T) {

receivers[config.NewComponentID("example")] = &builtReceiver{
logger: zap.NewNop(),
receiver: receiver,
Receiver: receiver,
}

assert.False(t, receiver.Started)
Expand Down Expand Up @@ -282,40 +282,3 @@ func TestBuildReceivers_Unused(t *testing.T) {
assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, receivers.ShutdownAll(context.Background()))
}

func TestBuildReceivers_NotSupportedDataType(t *testing.T) {
factories := createTestFactories()

tests := []struct {
configFile string
}{
{
configFile: "not_supported_receiver_logs.yaml",
},
{
configFile: "not_supported_receiver_metrics.yaml",
},
{
configFile: "not_supported_receiver_traces.yaml",
},
}

for _, test := range tests {
t.Run(test.configFile, func(t *testing.T) {

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories)
assert.NoError(t, err)
require.NotNil(t, cfg)

allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters)
assert.NoError(t, err)

pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors)
assert.NoError(t, err)

receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers)
assert.Error(t, err)
assert.Zero(t, len(receivers))
})
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 35b844e

Please sign in to comment.