From 259dda89b36429f8497dac2c4bfd0aed77a24f74 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 15 Nov 2022 13:05:42 -0800 Subject: [PATCH] Draft minimum changes to component.Host Signed-off-by: Bogdan Drutu --- .chloggen/deprecatecomp.yaml | 11 ++++++ .chloggen/draftminhost.yaml | 11 ++++++ component/componenttest/nop_host.go | 4 +-- component/exporter.go | 12 +++---- component/extension.go | 4 +-- component/host.go | 4 +-- component/processor.go | 13 +++---- component/receiver.go | 11 +++--- config/configauth/configauth.go | 4 +-- config/configauth/configauth_test.go | 8 ++--- config/configgrpc/configgrpc_test.go | 14 ++++---- config/confighttp/confighttp_test.go | 24 ++++++------- exporter/exporterhelper/queued_retry.go | 2 +- exporter/exporterhelper/queued_retry_test.go | 12 +++---- .../memorylimiter_test.go | 4 +-- receiver/scraperhelper/scrapercontroller.go | 2 +- service/extensions/extensions.go | 4 +-- service/host.go | 4 +-- service/internal/pipelines/pipelines.go | 34 +++++++++---------- 19 files changed, 98 insertions(+), 84 deletions(-) create mode 100755 .chloggen/deprecatecomp.yaml create mode 100755 .chloggen/draftminhost.yaml diff --git a/.chloggen/deprecatecomp.yaml b/.chloggen/deprecatecomp.yaml new file mode 100755 index 00000000000..42673312f4e --- /dev/null +++ b/.chloggen/deprecatecomp.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate `component.Receiver`, `component.Processor`, and `component.Exporter`. + +# One or more tracking issues or pull requests related to the change +issues: [6553] diff --git a/.chloggen/draftminhost.yaml b/.chloggen/draftminhost.yaml new file mode 100755 index 00000000000..1add1f5d892 --- /dev/null +++ b/.chloggen/draftminhost.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: `component.Extension` will be reverted to not be an alias. Change your `component.Host` implementation. + +# One or more tracking issues or pull requests related to the change +issues: [6553] diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..fc1fdac2265 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -32,10 +32,10 @@ func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Fact return nil } -func (nh *nopHost) GetExtensions() map[component.ID]component.Extension { +func (nh *nopHost) GetExtensions() map[component.ID]component.Component { return nil } -func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return nil } diff --git a/component/exporter.go b/component/exporter.go index fb08acabe6b..01fc3fc1e07 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -34,26 +34,24 @@ func UnmarshalExporterConfig(conf *confmap.Conf, cfg ExporterConfig) error { return unmarshal(conf, cfg) } -// Exporter exports telemetry data from the collector to a destination. -type Exporter interface { - Component -} +// Deprecated: [v0.65.0] unnecessary interface, will be removed. +type Exporter = Component // TracesExporter is an Exporter that can consume traces. type TracesExporter interface { - Exporter + Component consumer.Traces } // MetricsExporter is an Exporter that can consume metrics. type MetricsExporter interface { - Exporter + Component consumer.Metrics } // LogsExporter is an Exporter that can consume logs. type LogsExporter interface { - Exporter + Component consumer.Logs } diff --git a/component/extension.go b/component/extension.go index 4ddf08fe75b..d35a95c2519 100644 --- a/component/extension.go +++ b/component/extension.go @@ -36,9 +36,7 @@ func UnmarshalExtensionConfig(conf *confmap.Conf, cfg ExtensionConfig) error { // Extension is the interface for objects hosted by the OpenTelemetry Collector that // don't participate directly on data pipelines but provide some functionality // to the service, examples: health check endpoint, z-pages, etc. -type Extension interface { - Component -} +type Extension = Component // PipelineWatcher is an extra interface for Extension hosted by the OpenTelemetry // Collector that is to be implemented by extensions interested in changes to pipeline diff --git a/component/host.go b/component/host.go index 92c395b0306..9cff1172315 100644 --- a/component/host.go +++ b/component/host.go @@ -45,7 +45,7 @@ type Host interface { // // GetExtensions can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. - GetExtensions() map[ID]Extension + GetExtensions() map[ID]Component // GetExporters returns the map of exporters. Only enabled and created exporters will be returned. // Typically is used to find exporters by type or by full config name. Both cases @@ -58,5 +58,5 @@ type Host interface { // // GetExporters can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. - GetExporters() map[DataType]map[ID]Exporter + GetExporters() map[DataType]map[ID]Component } diff --git a/component/processor.go b/component/processor.go index 96106c7158f..7c8e878fb5b 100644 --- a/component/processor.go +++ b/component/processor.go @@ -34,27 +34,24 @@ func UnmarshalProcessorConfig(conf *confmap.Conf, cfg ProcessorConfig) error { return unmarshal(conf, cfg) } -// Processor defines the common functions that must be implemented by TracesProcessor -// and MetricsProcessor. -type Processor interface { - Component -} +// Deprecated: [v0.65.0] unnecessary interface, will be removed. +type Processor = Component // TracesProcessor is a processor that can consume traces. type TracesProcessor interface { - Processor + Component consumer.Traces } // MetricsProcessor is a processor that can consume metrics. type MetricsProcessor interface { - Processor + Component consumer.Metrics } // LogsProcessor is a processor that can consume logs. type LogsProcessor interface { - Processor + Component consumer.Logs } diff --git a/component/receiver.go b/component/receiver.go index 44337c45e72..874823d25e4 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -72,9 +72,8 @@ func UnmarshalReceiverConfig(conf *confmap.Conf, cfg ReceiverConfig) error { // // This ensures there are strong delivery guarantees once the data is acknowledged // by the Collector. -type Receiver interface { - Component -} +// Deprecated: [v0.65.0] unnecessary interface, will be removed. +type Receiver = Component // A TracesReceiver receives traces. // Its purpose is to translate data from any format to the collector's internal trace format. @@ -82,7 +81,7 @@ type Receiver interface { // // For example it could be Zipkin data source which translates Zipkin spans into ptrace.Traces. type TracesReceiver interface { - Receiver + Component } // A MetricsReceiver receives metrics. @@ -91,7 +90,7 @@ type TracesReceiver interface { // // For example it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics. type MetricsReceiver interface { - Receiver + Component } // A LogsReceiver receives logs. @@ -100,7 +99,7 @@ type MetricsReceiver interface { // // For example a LogsReceiver can read syslogs and convert them into plog.Logs. type LogsReceiver interface { - Receiver + Component } // ReceiverCreateSettings configures Receiver creators. diff --git a/config/configauth/configauth.go b/config/configauth/configauth.go index 30a440497b5..927d9c4be5e 100644 --- a/config/configauth/configauth.go +++ b/config/configauth/configauth.go @@ -35,7 +35,7 @@ type Authentication struct { // GetServerAuthenticator attempts to select the appropriate ServerAuthenticator from the list of extensions, // based on the requested extension name. If an authenticator is not found, an error is returned. -func (a Authentication) GetServerAuthenticator(extensions map[component.ID]component.Extension) (ServerAuthenticator, error) { +func (a Authentication) GetServerAuthenticator(extensions map[component.ID]component.Component) (ServerAuthenticator, error) { if ext, found := extensions[a.AuthenticatorID]; found { if auth, ok := ext.(ServerAuthenticator); ok { return auth, nil @@ -49,7 +49,7 @@ func (a Authentication) GetServerAuthenticator(extensions map[component.ID]compo // GetClientAuthenticator attempts to select the appropriate ClientAuthenticator from the list of extensions, // based on the component id of the extension. If an authenticator is not found, an error is returned. // This should be only used by HTTP clients. -func (a Authentication) GetClientAuthenticator(extensions map[component.ID]component.Extension) (ClientAuthenticator, error) { +func (a Authentication) GetClientAuthenticator(extensions map[component.ID]component.Component) (ClientAuthenticator, error) { if ext, found := extensions[a.AuthenticatorID]; found { if auth, ok := ext.(ClientAuthenticator); ok { return auth, nil diff --git a/config/configauth/configauth_test.go b/config/configauth/configauth_test.go index 04707e699ef..e664abafc6e 100644 --- a/config/configauth/configauth_test.go +++ b/config/configauth/configauth_test.go @@ -45,7 +45,7 @@ func TestGetServerAuthenticator(t *testing.T) { cfg := &Authentication{ AuthenticatorID: component.NewID("mock"), } - ext := map[component.ID]component.Extension{ + ext := map[component.ID]component.Component{ component.NewID("mock"): tC.authenticator, } @@ -68,7 +68,7 @@ func TestGetServerAuthenticatorFails(t *testing.T) { AuthenticatorID: component.NewID("does-not-exist"), } - authenticator, err := cfg.GetServerAuthenticator(map[component.ID]component.Extension{}) + authenticator, err := cfg.GetServerAuthenticator(map[component.ID]component.Component{}) assert.ErrorIs(t, err, errAuthenticatorNotFound) assert.Nil(t, authenticator) } @@ -96,7 +96,7 @@ func TestGetClientAuthenticator(t *testing.T) { cfg := &Authentication{ AuthenticatorID: component.NewID("mock"), } - ext := map[component.ID]component.Extension{ + ext := map[component.ID]component.Component{ component.NewID("mock"): tC.authenticator, } @@ -118,7 +118,7 @@ func TestGetClientAuthenticatorFails(t *testing.T) { cfg := &Authentication{ AuthenticatorID: component.NewID("does-not-exist"), } - authenticator, err := cfg.GetClientAuthenticator(map[component.ID]component.Extension{}) + authenticator, err := cfg.GetClientAuthenticator(map[component.ID]component.Component{}) assert.ErrorIs(t, err, errAuthenticatorNotFound) assert.Nil(t, authenticator) } diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index eeaa93bf4af..d4784671904 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -91,7 +91,7 @@ func TestAllGrpcClientSettings(t *testing.T) { Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")}, }, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("testauth"): &configauth.MockClientAuthenticator{}, }, }, @@ -119,7 +119,7 @@ func TestAllGrpcClientSettings(t *testing.T) { Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")}, }, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("testauth"): &configauth.MockClientAuthenticator{}, }, }, @@ -147,7 +147,7 @@ func TestAllGrpcClientSettings(t *testing.T) { Auth: &configauth.Authentication{AuthenticatorID: component.NewID("testauth")}, }, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("testauth"): &configauth.MockClientAuthenticator{}, }, }, @@ -216,7 +216,7 @@ func TestGrpcServerAuthSettings(t *testing.T) { AuthenticatorID: component.NewID("mock"), } host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): configauth.NewServerAuthenticator(), }, } @@ -295,7 +295,7 @@ func TestGRPCClientSettingsError(t *testing.T) { Endpoint: "localhost:1234", Auth: &configauth.Authentication{AuthenticatorID: component.NewID("doesntexist")}, }, - host: &mockHost{ext: map[component.ID]component.Extension{}}, + host: &mockHost{ext: map[component.ID]component.Component{}}, }, { err: "no extensions configuration available", @@ -1075,9 +1075,9 @@ func tempSocketName(t *testing.T) string { type mockHost struct { component.Host - ext map[component.ID]component.Extension + ext map[component.ID]component.Component } -func (nh *mockHost) GetExtensions() map[component.ID]component.Extension { +func (nh *mockHost) GetExtensions() map[component.ID]component.Component { return nh.ext } diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index 26cbd24e0aa..a0acdbd53bc 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -50,7 +50,7 @@ func (c *customRoundTripper) RoundTrip(request *http.Request) (*http.Response, e func TestAllHTTPClientSettings(t *testing.T) { host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("testauth"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}}, }, } @@ -160,7 +160,7 @@ func TestAllHTTPClientSettings(t *testing.T) { func TestPartialHTTPClientSettings(t *testing.T) { host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("testauth"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}}, }, } @@ -211,7 +211,7 @@ func TestDefaultHTTPClientSettings(t *testing.T) { func TestHTTPClientSettingsError(t *testing.T) { host := &mockHost{ - ext: map[component.ID]component.Extension{}, + ext: map[component.ID]component.Component{}, } tests := []struct { settings HTTPClientSettings @@ -274,7 +274,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) { }, shouldErr: false, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): &configauth.MockClientAuthenticator{ ResultRoundTripper: &customRoundTripper{}, }, @@ -289,7 +289,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) { }, shouldErr: true, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}}, }, }, @@ -311,7 +311,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) { }, shouldErr: false, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): &configauth.MockClientAuthenticator{ResultRoundTripper: &customRoundTripper{}}, }, }, @@ -324,7 +324,7 @@ func TestHTTPClientSettingWithAuthConfig(t *testing.T) { }, shouldErr: true, host: &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): &configauth.MockClientAuthenticator{ ResultRoundTripper: &customRoundTripper{}, MustError: true}, }, @@ -737,7 +737,7 @@ func TestHttpCorsWithAuthentication(t *testing.T) { } host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): configauth.NewServerAuthenticator( configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) { return ctx, errors.New("authentication failed") @@ -932,7 +932,7 @@ func TestServerAuth(t *testing.T) { } host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): configauth.NewServerAuthenticator( configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) { authCalled = true @@ -979,7 +979,7 @@ func TestFailedServerAuth(t *testing.T) { }, } host := &mockHost{ - ext: map[component.ID]component.Extension{ + ext: map[component.ID]component.Component{ component.NewID("mock"): configauth.NewServerAuthenticator( configauth.WithAuthenticate(func(ctx context.Context, headers map[string][]string) (context.Context, error) { return ctx, errors.New("authentication failed") @@ -1002,10 +1002,10 @@ func TestFailedServerAuth(t *testing.T) { type mockHost struct { component.Host - ext map[component.ID]component.Extension + ext map[component.ID]component.Component } -func (nh *mockHost) GetExtensions() map[component.ID]component.Extension { +func (nh *mockHost) GetExtensions() map[component.ID]component.Component { return nh.ext } diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index c4a93ce143a..96ecb0461cd 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -127,7 +127,7 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue return qrs } -func getStorageExtension(extensions map[component.ID]component.Extension, storageID component.ID) (storage.Extension, error) { +func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) { if ext, found := extensions[storageID]; found { if storageExt, ok := ext.(storage.Extension); ok { return storageExt, nil diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 574f088bf4d..e22a63c7b27 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -435,7 +435,7 @@ func TestGetRetrySettings(t *testing.T) { t.Run(tC.desc, func(t *testing.T) { storageID := component.NewIDWithName("file_storage", strconv.Itoa(tC.storageIndex)) - var extensions = map[component.ID]component.Extension{} + var extensions = map[component.ID]component.Component{} for i := 0; i < tC.numStorages; i++ { extensions[component.NewIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError} } @@ -466,7 +466,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { settings := componenttest.NewNopExtensionCreateSettings() extension, err := factory.CreateExtension(context.Background(), settings, extConfig) assert.NoError(t, err) - var extensions = map[component.ID]component.Extension{ + var extensions = map[component.ID]component.Component{ storageID: extension, } host := &mockHost{ext: extensions} @@ -545,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) require.NoError(t, err) - var extensions = map[component.ID]component.Extension{ + var extensions = map[component.ID]component.Component{ storageID: &mockStorageExtension{}, } host := &mockHost{ext: extensions} @@ -568,7 +568,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { be, err := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) require.NoError(t, err) - var extensions = map[component.ID]component.Extension{ + var extensions = map[component.ID]component.Component{ storageID: &mockStorageExtension{GetClientError: storageError}, } host := &mockHost{ext: extensions} @@ -746,10 +746,10 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met type mockHost struct { component.Host - ext map[component.ID]component.Extension + ext map[component.ID]component.Component } -func (nh *mockHost) GetExtensions() map[component.ID]component.Extension { +func (nh *mockHost) GetExtensions() map[component.ID]component.Component { return nh.ext } diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 0ebe99d4ab2..31d51790cf1 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -430,8 +430,8 @@ type host struct { component.Host } -func (h *host) GetExtensions() map[component.ID]component.Extension { - ret := make(map[component.ID]component.Extension) +func (h *host) GetExtensions() map[component.ID]component.Component { + ret := make(map[component.ID]component.Component) ret[component.NewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} return ret } diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/scrapercontroller.go index 7c6f52c6679..55759422e09 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/scrapercontroller.go @@ -95,7 +95,7 @@ func NewScraperControllerReceiver( set component.ReceiverCreateSettings, nextConsumer consumer.Metrics, options ...ScraperControllerOption, -) (component.Receiver, error) { +) (component.Component, error) { if nextConsumer == nil { return nil, component.ErrNilNextConsumer } diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..8bc1beb0d9b 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -83,8 +83,8 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } -func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { - result := make(map[component.ID]component.Extension, len(bes.extMap)) +func (bes *Extensions) GetExtensions() map[component.ID]component.Component { + result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { result[extID] = v } diff --git a/service/host.go b/service/host.go index 31be010608d..607cb0c6b85 100644 --- a/service/host.go +++ b/service/host.go @@ -52,10 +52,10 @@ func (host *serviceHost) GetFactory(kind component.Kind, componentType component return nil } -func (host *serviceHost) GetExtensions() map[component.ID]component.Extension { +func (host *serviceHost) GetExtensions() map[component.ID]component.Component { return host.extensions.GetExtensions() } -func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return host.pipelines.GetExporters() } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..726ca95e6fa 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -59,8 +59,8 @@ type builtPipeline struct { type Pipelines struct { telemetry component.TelemetrySettings - allReceivers map[component.DataType]map[component.ID]component.Receiver - allExporters map[component.DataType]map[component.ID]component.Exporter + allReceivers map[component.DataType]map[component.ID]component.Component + allExporters map[component.DataType]map[component.ID]component.Component pipelines map[component.ID]*builtPipeline } @@ -139,12 +139,12 @@ func (bps *Pipelines) ShutdownAll(ctx context.Context) error { return errs } -func (bps *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Exporter { - exportersMap := make(map[component.DataType]map[component.ID]component.Exporter) +func (bps *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Component { + exportersMap := make(map[component.DataType]map[component.ID]component.Component) - exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeTraces])) - exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeMetrics])) - exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeLogs])) + exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeTraces])) + exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeMetrics])) + exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Component, len(bps.allExporters[component.DataTypeLogs])) for dt, expByID := range bps.allExporters { for expID, exp := range expByID { @@ -208,8 +208,8 @@ type Settings struct { func Build(ctx context.Context, set Settings) (*Pipelines, error) { exps := &Pipelines{ telemetry: set.Telemetry, - allReceivers: make(map[component.DataType]map[component.ID]component.Receiver), - allExporters: make(map[component.DataType]map[component.ID]component.Exporter), + allReceivers: make(map[component.DataType]map[component.ID]component.Component), + allExporters: make(map[component.DataType]map[component.ID]component.Component), pipelines: make(map[component.ID]*builtPipeline, len(set.PipelineConfigs)), } @@ -221,7 +221,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { for pipelineID, pipeline := range set.PipelineConfigs { // The data type of the pipeline defines what data type each exporter is expected to receive. if _, ok := exps.allExporters[pipelineID.Type()]; !ok { - exps.allExporters[pipelineID.Type()] = make(map[component.ID]component.Exporter) + exps.allExporters[pipelineID.Type()] = make(map[component.ID]component.Component) } expByID := exps.allExporters[pipelineID.Type()] @@ -306,7 +306,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { for pipelineID, pipeline := range set.PipelineConfigs { // The data type of the pipeline defines what data type each exporter is expected to receive. if _, ok := exps.allReceivers[pipelineID.Type()]; !ok { - exps.allReceivers[pipelineID.Type()] = make(map[component.ID]component.Receiver) + exps.allReceivers[pipelineID.Type()] = make(map[component.ID]component.Component) } recvByID := exps.allReceivers[pipelineID.Type()] bp := exps.pipelines[pipelineID] @@ -339,7 +339,7 @@ func buildExporter( factories map[component.Type]component.ExporterFactory, id component.ID, pipelineID component.ID, -) (component.Exporter, error) { +) (component.Component, error) { cfg, existsCfg := cfgs[id] if !existsCfg { return nil, fmt.Errorf("exporter %q is not configured", id) @@ -365,7 +365,7 @@ func buildExporter( return exp, nil } -func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id component.ID, pipelineID component.ID, factory component.ExporterFactory) (component.Exporter, error) { +func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id component.ID, pipelineID component.ID, factory component.ExporterFactory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesExporter(ctx, set, cfg) @@ -433,7 +433,7 @@ func buildProcessor(ctx context.Context, id component.ID, pipelineID component.ID, next baseConsumer, -) (component.Processor, error) { +) (component.Component, error) { procCfg, existsCfg := cfgs[id] if !existsCfg { return nil, fmt.Errorf("processor %q is not configured", id) @@ -458,7 +458,7 @@ func buildProcessor(ctx context.Context, return proc, nil } -func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id component.ID, pipelineID component.ID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) { +func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id component.ID, pipelineID component.ID, next baseConsumer, factory component.ProcessorFactory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) @@ -499,7 +499,7 @@ func buildReceiver(ctx context.Context, id component.ID, pipelineID component.ID, nexts []baseConsumer, -) (component.Receiver, error) { +) (component.Component, error) { cfg, existsCfg := cfgs[id] if !existsCfg { return nil, fmt.Errorf("receiver %q is not configured", id) @@ -525,7 +525,7 @@ func buildReceiver(ctx context.Context, return recv, nil } -func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) { +func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Component, error) { switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces