Skip to content

Commit

Permalink
Add entry point to build graph-based pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 25, 2023
1 parent 1af31a0 commit 46be367
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
11 changes: 2 additions & 9 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service"
)

Expand All @@ -28,12 +27,6 @@ var (
errMissingReceivers = errors.New("no receiver configuration specified in config")
)

var connectorsFeatureGate = featuregate.GlobalRegistry().MustRegister(
"otelcol.enableConnectors",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336"))

// Config defines the configuration for the various elements of collector or agent.
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Expand Down Expand Up @@ -114,8 +107,8 @@ func (cfg *Config) Validate() error {
}
}

if len(cfg.Connectors) != 0 && !connectorsFeatureGate.IsEnabled() {
return fmt.Errorf("connectors require feature gate: %s", connectorsFeatureGate.ID())
if len(cfg.Connectors) != 0 && !service.ConnectorsFeatureGate.IsEnabled() {
return fmt.Errorf("connectors require feature gate: %s", service.ConnectorsFeatureGate.ID())
}

if err := cfg.Service.Validate(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func TestConfigValidate(t *testing.T) {
},
}

require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGate.ID(): true}))
require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{service.ConnectorsFeatureGate.ID(): true}))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGate.ID(): false}))
require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{service.ConnectorsFeatureGate.ID(): false}))
}()
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"net/http"

"go.uber.org/multierr"
Expand All @@ -32,6 +33,11 @@ type pipelinesGraph struct {
componentGraph *simple.DirectedGraph
}

func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) {
err := errors.New("not yet implemented")
return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -36,6 +37,12 @@ import (
"go.opentelemetry.io/collector/service/telemetry"
)

var ConnectorsFeatureGate = featuregate.GlobalRegistry().MustRegister(
"otelcol.enableConnectors",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Enables 'connectors', a new type of component for transmitting signals between pipelines."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/2336"))

// Settings holds configuration for building a new service.
type Settings struct {
// BuildInfo provides collector start information.
Expand Down Expand Up @@ -194,10 +201,18 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,
Receivers: set.Receivers,
Processors: set.Processors,
Exporters: set.Exporters,
Connectors: set.Connectors,
PipelineConfigs: cfg.Pipelines,
}
if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)

if ConnectorsFeatureGate.IsEnabled() {
if srv.host.pipelines, err = buildPipelinesGraph(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}
} else {
if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}
}

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
Expand Down

0 comments on commit 46be367

Please sign in to comment.