Skip to content

Commit

Permalink
Validate that pipeline components are of same signal type as pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Jan 11, 2024
1 parent eba6c76 commit 890b2ba
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
21 changes: 20 additions & 1 deletion otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ func (col *Collector) DryRun(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

return cfg.Validate()
if err := cfg.Validate(); err != nil {
return err
}

return col.validatePipelineCfg(ctx, cfg, factories)
}

// Run starts the collector according to the given configuration, and waits for it to complete.
Expand Down Expand Up @@ -298,3 +302,18 @@ func (col *Collector) shutdown(ctx context.Context) error {
func (col *Collector) setCollectorState(state State) {
col.state.Store(int32(state))
}

// validatePipelineConfig validates that the components in a pipeline support the
// signal type of the pipeline. For example, this function will return an error if
// a metrics pipeline has non-metrics components.
func (col *Collector) validatePipelineCfg(ctx context.Context, cfg *Config, factories Factories) error {
set := service.Settings{
Receivers: receiver.NewBuilder(cfg.Receivers, factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, factories.Exporters),
Connectors: connector.NewBuilder(cfg.Connectors, factories.Connectors),
Extensions: extension.NewBuilder(cfg.Extensions, factories.Extensions),
}

return service.Validate(ctx, set, cfg.Service)
}
30 changes: 30 additions & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,36 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return srv, nil
}

func Validate(ctx context.Context, set Settings, cfg Config) error {
tel, err := telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to get logger: %w", err)
}

telSettings := servicetelemetry.TelemetrySettings{
Logger: tel.Logger(),
TracerProvider: tel.TracerProvider(),
MeterProvider: noop.NewMeterProvider(),
}

pSet := graph.Settings{
Telemetry: telSettings,
BuildInfo: set.BuildInfo,
ReceiverBuilder: set.Receivers,
ProcessorBuilder: set.Processors,
ExporterBuilder: set.Exporters,
ConnectorBuilder: set.Connectors,
PipelineConfigs: cfg.Pipelines,
}

_, err = graph.Build(ctx, pSet)
if err != nil {
return fmt.Errorf("failed to build pipelines: %w", err)
}

return nil
}

// Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state.
// Start does the following steps in order:
// 1. Start all extensions.
Expand Down
Empty file added test.yaml
Empty file.

0 comments on commit 890b2ba

Please sign in to comment.