Skip to content

Commit

Permalink
Call service.New to validate pipeline types
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Mar 5, 2024
1 parent d9e74d8 commit e373dd9
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 102 deletions.
7 changes: 6 additions & 1 deletion otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,5 +329,10 @@ func (col *Collector) validatePipelineCfg(ctx context.Context, cfg *Config, fact
Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions),
}

return service.Validate(ctx, set, cfg.Service)
_, err := service.New(ctx, set, cfg.Service)
if err != nil {
return err
}

return nil
}
41 changes: 33 additions & 8 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,41 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
}

func TestCollectorDryRun(t *testing.T) {
// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
tests := map[string]struct {
settings CollectorSettings
expectedErr string
}{
"invalid_processor": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
},
expectedErr: `service::pipelines::traces: references processor "invalid" which is not configured`,
},
"logs_receiver_traces_pipeline": {
settings: CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid-receiver-type.yaml")}),
},
expectedErr: `failed to build pipelines: failed to create "nop_logs" receiver for data type "traces": telemetry type is not supported`,
},
}
col, err := NewCollector(set)
require.NoError(t, err)

require.Error(t, col.DryRun(context.Background()))
for name, test := range tests {
t.Run(name, func(t *testing.T) {
col, err := NewCollector(test.settings)
require.NoError(t, err)

err = col.DryRun(context.Background())
if test.expectedErr == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, test.expectedErr)
}
})
}
}

func TestPassConfmapToServiceFailure(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion otelcol/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func nopFactories() (Factories, error) {
return Factories{}, err
}

if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory()); err != nil {
if factories.Receivers, err = receiver.MakeFactoryMap(receivertest.NewNopFactory(), receivertest.NewNopLogsFactory()); err != nil {
return Factories{}, err
}

Expand Down
18 changes: 18 additions & 0 deletions otelcol/testdata/otelcol-invalid-receiver-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
receivers:
nop_logs:

processors:
nop:

exporters:
nop:

service:
telemetry:
metrics:
address: localhost:8888
pipelines:
traces:
receivers: [nop_logs]
processors: [nop]
exporters: [nop]
54 changes: 54 additions & 0 deletions receiver/receivertest/nop_logs_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package receivertest // import "go.opentelemetry.io/collector/receiver/receivertest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

var nopLogsComponentType = component.MustNewType("nop_logs")

// NewNopLogsCreateSettings returns a new nop settings for Create*Receiver functions.
func NewNopLogsCreateSettings() receiver.CreateSettings {
return receiver.CreateSettings{
ID: component.NewID(nopLogsComponentType),
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

// NewNopLogsFactory returns a receiver.Factory that constructs nop logs receivers.
func NewNopLogsFactory() receiver.Factory {
createLogs := func(context.Context, receiver.CreateSettings, component.Config, consumer.Logs) (receiver.Logs, error) {
return nopInstance, nil
}

return receiver.NewFactory(
nopLogsComponentType,
func() component.Config { return &nopLogsConfig{} },
receiver.WithLogs(createLogs, component.StabilityLevelStable))
}

type nopLogsConfig struct{}

var nopLogsInstance = &nopLogsReceiver{}

// nopLogsReceiver acts as a receiver for testing purposes.
type nopLogsReceiver struct {
component.StartFunc
component.ShutdownFunc
}

// NewNopLogsBuilder returns a receiver.Builder that constructs nop receivers.
func NewNopLogsBuilder() *receiver.Builder {
nopFactory := NewNopLogsFactory()
return receiver.NewBuilder(
map[component.ID]component.Config{component.NewID(componentType): nopFactory.CreateDefaultConfig()},
map[component.Type]receiver.Factory{componentType: nopFactory})
}
32 changes: 0 additions & 32 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"runtime"

"go.opentelemetry.io/otel/metric/noop"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -134,37 +133,6 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return srv, nil
}

// Validate validates the service. Validation fails if all the components in every
// service pipeline don't support the same signal type as that of the pipeline.
func Validate(ctx context.Context, set Settings, cfg Config) error {
tel, err := telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to get logger: %w", err)
}

telSettings := servicetelemetry.TelemetrySettings{
Logger: tel.Logger(),
TracerProvider: tel.TracerProvider(),
MeterProvider: noop.NewMeterProvider(),
}

pSet := graph.Settings{
Telemetry: telSettings,
BuildInfo: set.BuildInfo,
ReceiverBuilder: set.Receivers,
ProcessorBuilder: set.Processors,
ExporterBuilder: set.Exporters,
ConnectorBuilder: set.Connectors,
PipelineConfigs: cfg.Pipelines,
}

if _, err := graph.Build(ctx, pSet); err != nil {
return fmt.Errorf("failed to build pipelines for validation: %w", err)
}

return nil
}

// Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state.
// Start does the following steps in order:
// 1. Start all extensions.
Expand Down
60 changes: 0 additions & 60 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/extension/zpagesextension"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/pipelines"
Expand Down Expand Up @@ -432,64 +430,6 @@ func TestServiceFatalError(t *testing.T) {
require.ErrorIs(t, err, assert.AnError)
}

func TestServiceValidate(t *testing.T) {
tests := map[string]struct {
createSettings func() Settings
config Config

expectedErr string
}{
"same_types": {
createSettings: newNopSettings,
config: newNopConfig(),
expectedErr: "",
},
"different_types": {
createSettings: func() Settings {
const typeStr = "nop"

createMetricsReceiver := func(_ context.Context, _ receiver.CreateSettings, _ component.Config, _ consumer.Metrics) (receiver.Metrics, error) {
return struct {
component.StartFunc
component.ShutdownFunc
}{}, nil
}

createDefaultConfig := func() component.Config {
return &struct{}{}
}

metricsOnlyReceiverFactory := receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelStable),
)

settings := newNopSettings()
settings.Receivers = receiver.NewBuilder(
map[component.ID]component.Config{component.NewID(typeStr): metricsOnlyReceiverFactory.CreateDefaultConfig()},
map[component.Type]receiver.Factory{typeStr: metricsOnlyReceiverFactory},
)

return settings
},
config: newNopConfig(),
expectedErr: `telemetry type is not supported`,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
err := Validate(context.Background(), test.createSettings(), test.config)
if test.expectedErr == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, test.expectedErr)
}
})
}
}

func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
for key, labelValue := range expectedLabels {
lookupKey, ok := prometheusToOtelConv[key]
Expand Down

0 comments on commit e373dd9

Please sign in to comment.