Skip to content

Commit

Permalink
Rename pipelineSettings fields with Builder (#7093)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Feb 2, 2023
1 parent 3f73651 commit 83599a7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
14 changes: 7 additions & 7 deletions service/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ type pipelinesSettings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

Receivers *receiver.Builder
Processors *processor.Builder
Exporters *exporter.Builder
Connectors *connector.Builder
ReceiverBuilder *receiver.Builder
ProcessorBuilder *processor.Builder
ExporterBuilder *exporter.Builder
ConnectorBuilder *connector.Builder

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs map[component.ID]*PipelineConfig
Expand Down Expand Up @@ -245,7 +245,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = components.ExporterLogger(set.Telemetry.Logger, expID, pipelineID.Type())
exp, err := buildExporter(ctx, cSet, set.Exporters, pipelineID)
exp, err := buildExporter(ctx, cSet, set.ExporterBuilder, pipelineID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = components.ProcessorLogger(set.Telemetry.Logger, procID, pipelineID)
proc, err := buildProcessor(ctx, cSet, set.Processors, pipelineID, bp.lastConsumer)
proc, err := buildProcessor(ctx, cSet, set.ProcessorBuilder, pipelineID, bp.lastConsumer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro
BuildInfo: set.BuildInfo,
}
cSet.TelemetrySettings.Logger = components.ReceiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type())
recv, err := buildReceiver(ctx, cSet, set.Receivers, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
recv, err := buildReceiver(ctx, cSet, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions service/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,23 @@ func TestBuildPipelines(t *testing.T) {
pips, err := buildPipelines(context.Background(), pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
},
map[component.Type]receiver.Factory{
testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory,
}),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
map[component.ID]component.Config{
component.NewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(),
component.NewIDWithName("exampleprocessor", "1"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(),
},
map[component.Type]processor.Factory{
testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory,
}),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
map[component.ID]component.Config{
component.NewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
component.NewIDWithName("exampleexporter", "1"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
Expand Down Expand Up @@ -579,19 +579,19 @@ func TestBuildErrors(t *testing.T) {
set := pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
test.ReceiverConfigs,
map[component.Type]receiver.Factory{
nopReceiverFactory.Type(): nopReceiverFactory,
badReceiverFactory.Type(): badReceiverFactory,
}),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
test.ProcessorConfigs,
map[component.Type]processor.Factory{
nopProcessorFactory.Type(): nopProcessorFactory,
badProcessorFactory.Type(): badProcessorFactory,
}),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
test.ExporterConfigs,
map[component.Type]exporter.Factory{
nopExporterFactory.Type(): nopExporterFactory,
Expand All @@ -617,7 +617,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
set := pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(),
component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(),
Expand All @@ -626,7 +626,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
nopReceiverFactory.Type(): nopReceiverFactory,
errReceiverFactory.Type(): errReceiverFactory,
}),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(),
component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(),
Expand All @@ -635,7 +635,7 @@ func TestFailToStartAndShutdown(t *testing.T) {
nopProcessorFactory.Type(): nopProcessorFactory,
errProcessorFactory.Type(): errProcessorFactory,
}),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(),
component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(),
Expand Down
14 changes: 7 additions & 7 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,
}

pSet := pipelinesSettings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Receivers: set.Receivers,
Processors: set.Processors,
Exporters: set.Exporters,
Connectors: set.Connectors,
PipelineConfigs: cfg.Pipelines,
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
ReceiverBuilder: set.Receivers,
ProcessorBuilder: set.Processors,
ExporterBuilder: set.Exporters,
ConnectorBuilder: set.Connectors,
PipelineConfigs: cfg.Pipelines,
}

if sharedgate.ConnectorsFeatureGate.IsEnabled() {
Expand Down

0 comments on commit 83599a7

Please sign in to comment.